← Back to task

Commit 9fa7697c

commit 9fa7697cd979eaa15a2479819463c3bdd86cc99a
Author: Ben Sima <ben@bensima.com>
Date:   Sun Nov 30 21:30:00 2025

    Add agent observability: event logging and storage
    
    - Add Omni/Agent/Event.hs with AgentEvent types
    - Add agent_events table schema and CRUD functions to Core.hs
    - Add new callbacks to Engine.hs: onAssistant, onToolResult, onComplete, onError
    - Wire event logging into Worker.hs with session tracking
    
    Events are now persisted to SQLite for each agent work session,
    enabling visibility into agent reasoning and tool usage.
    
    Task-Id: t-197.1
    Task-Id: t-197.2
    Task-Id: t-197.3

diff --git a/Omni/Agent/Engine.hs b/Omni/Agent/Engine.hs
index e0193415..1f5dcc84 100644
--- a/Omni/Agent/Engine.hs
+++ b/Omni/Agent/Engine.hs
@@ -254,7 +254,11 @@ data EngineConfig = EngineConfig
   { engineLLM :: LLM,
     engineOnCost :: Int -> Int -> IO (),
     engineOnActivity :: Text -> IO (),
-    engineOnToolCall :: Text -> Text -> IO ()
+    engineOnToolCall :: Text -> Text -> IO (),
+    engineOnAssistant :: Text -> IO (),
+    engineOnToolResult :: Text -> Bool -> Text -> IO (),
+    engineOnComplete :: IO (),
+    engineOnError :: Text -> IO ()
   }
 
 defaultEngineConfig :: EngineConfig
@@ -263,7 +267,11 @@ defaultEngineConfig =
     { engineLLM = defaultLLM,
       engineOnCost = \_ _ -> pure (),
       engineOnActivity = \_ -> pure (),
-      engineOnToolCall = \_ _ -> pure ()
+      engineOnToolCall = \_ _ -> pure (),
+      engineOnAssistant = \_ -> pure (),
+      engineOnToolResult = \_ _ _ -> pure (),
+      engineOnComplete = pure (),
+      engineOnError = \_ -> pure ()
     }
 
 data AgentResult = AgentResult
@@ -495,26 +503,30 @@ runAgent engineCfg agentCfg userPrompt = do
 
     loop :: LLM -> [Tool] -> Map.Map Text Tool -> [Message] -> Int -> Int -> Int -> IO (Either Text AgentResult)
     loop llm tools' toolMap msgs iteration totalCalls totalTokens
-      | iteration >= maxIter =
-          pure
-            <| Left
-            <| "Max iterations ("
-            <> tshow maxIter
-            <> ") reached"
+      | iteration >= maxIter = do
+          let errMsg = "Max iterations (" <> tshow maxIter <> ") reached"
+          engineOnError engineCfg errMsg
+          pure <| Left errMsg
       | otherwise = do
           engineOnActivity engineCfg <| "Iteration " <> tshow (iteration + 1)
           result <- chatWithUsage llm tools' msgs
           case result of
-            Left err -> pure (Left err)
+            Left err -> do
+              engineOnError engineCfg err
+              pure (Left err)
             Right chatRes -> do
               let msg = chatMessage chatRes
                   tokens = maybe 0 usageTotalTokens (chatUsage chatRes)
                   cost = estimateCost (llmModel llm) tokens
               engineOnCost engineCfg tokens cost
               let newTokens = totalTokens + tokens
+              let assistantText = msgContent msg
+              unless (Text.null assistantText) <|
+                engineOnAssistant engineCfg assistantText
               case msgToolCalls msg of
                 Nothing -> do
                   engineOnActivity engineCfg "Agent completed"
+                  engineOnComplete engineCfg
                   pure
                     <| Right
                     <| AgentResult
@@ -526,6 +538,7 @@ runAgent engineCfg agentCfg userPrompt = do
                       }
                 Just [] -> do
                   engineOnActivity engineCfg "Agent completed (empty tool calls)"
+                  engineOnComplete engineCfg
                   pure
                     <| Right
                     <| AgentResult
@@ -552,22 +565,22 @@ executeToolCalls engineCfg toolMap = traverse executeSingle
           argsText = fcArguments (tcFunction tc)
           callId = tcId tc
       engineOnActivity engineCfg <| "Executing tool: " <> name
+      engineOnToolCall engineCfg name argsText
       case Map.lookup name toolMap of
         Nothing -> do
           let errMsg = "Tool not found: " <> name
-          engineOnToolCall engineCfg name errMsg
+          engineOnToolResult engineCfg name False errMsg
           pure <| Message ToolRole errMsg Nothing (Just callId)
         Just tool -> do
           case Aeson.decode (BL.fromStrict (TE.encodeUtf8 argsText)) of
             Nothing -> do
               let errMsg = "Invalid JSON arguments: " <> argsText
-              engineOnToolCall engineCfg name errMsg
+              engineOnToolResult engineCfg name False errMsg
               pure <| Message ToolRole errMsg Nothing (Just callId)
             Just args -> do
               resultValue <- toolExecute tool args
               let resultText = TE.decodeUtf8 (BL.toStrict (Aeson.encode resultValue))
-                  summary = Text.take 100 resultText
-              engineOnToolCall engineCfg name summary
+              engineOnToolResult engineCfg name True resultText
               pure <| Message ToolRole resultText Nothing (Just callId)
 
 estimateCost :: Text -> Int -> Int
diff --git a/Omni/Agent/Event.hs b/Omni/Agent/Event.hs
new file mode 100644
index 00000000..2b400779
--- /dev/null
+++ b/Omni/Agent/Event.hs
@@ -0,0 +1,180 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Agent Event types for observability and streaming.
+--
+-- Captures all events during agent execution for logging,
+-- streaming to web UI, and future interactive chat.
+module Omni.Agent.Event
+  ( AgentEvent (..),
+    EventType (..),
+    eventToJSON,
+    eventFromJSON,
+    formatEventForTerminal,
+  )
+where
+
+import Alpha
+import Data.Aeson ((.=))
+import qualified Data.Aeson as Aeson
+import qualified Data.Text as Text
+import Data.Time (UTCTime, defaultTimeLocale, formatTime)
+
+-- | Types of agent events
+data EventType
+  = Assistant -- LLM text response
+  | ToolCall -- Tool invocation with arguments
+  | ToolResult -- Tool execution result
+  | UserMessage -- For future interactive chat
+  | Cost -- Token usage and cost info
+  | Error -- Failures and errors
+  | Complete -- Session ended successfully
+  deriving (Show, Eq, Read)
+
+-- | A single agent event with timestamp and content
+data AgentEvent = AgentEvent
+  { eventType :: EventType,
+    eventTimestamp :: UTCTime,
+    eventContent :: Aeson.Value
+  }
+  deriving (Show, Eq)
+
+-- | Convert event to JSON for storage/streaming
+eventToJSON :: AgentEvent -> Aeson.Value
+eventToJSON e =
+  Aeson.object
+    [ "type" .= show (eventType e),
+      "timestamp" .= eventTimestamp e,
+      "content" .= eventContent e
+    ]
+
+-- | Parse event from JSON
+eventFromJSON :: Aeson.Value -> Maybe AgentEvent
+eventFromJSON v = do
+  obj <- case v of
+    Aeson.Object o -> Just o
+    _ -> Nothing
+  typeStr <- case Aeson.lookup "type" (Aeson.toList obj) of
+    Just (Aeson.String t) -> Just (Text.unpack t)
+    _ -> Nothing
+  eventT <- readMaybe typeStr
+  ts <- case Aeson.lookup "timestamp" (Aeson.toList obj) of
+    Just t -> Aeson.parseMaybe Aeson.parseJSON t
+    _ -> Nothing
+  content <- Aeson.lookup "content" (Aeson.toList obj)
+  pure
+    AgentEvent
+      { eventType = eventT,
+        eventTimestamp = ts,
+        eventContent = content
+      }
+  where
+    Aeson.lookup k pairs = snd </ find (\(k', _) -> k' == k) pairs
+    Aeson.toList (Aeson.Object o) = map (first Aeson.toText) (Aeson.toList o)
+    Aeson.toList _ = []
+    Aeson.toText = id
+    first f (a, b) = (f a, b)
+
+-- | Format event for terminal display
+formatEventForTerminal :: AgentEvent -> Text
+formatEventForTerminal e =
+  let ts = Text.pack <| formatTime defaultTimeLocale "%H:%M:%S" (eventTimestamp e)
+      content = case eventType e of
+        Assistant -> case eventContent e of
+          Aeson.String t -> "Assistant: " <> truncate' 100 t
+          _ -> "Assistant: <message>"
+        ToolCall -> case eventContent e of
+          Aeson.Object _ ->
+            let toolName = getField "tool" (eventContent e)
+             in "Tool: " <> toolName
+          _ -> "Tool: <call>"
+        ToolResult -> case eventContent e of
+          Aeson.Object _ ->
+            let toolName = getField "tool" (eventContent e)
+                success = getField "success" (eventContent e)
+             in "Result: " <> toolName <> " (" <> success <> ")"
+          _ -> "Result: <result>"
+        UserMessage -> case eventContent e of
+          Aeson.String t -> "User: " <> truncate' 100 t
+          _ -> "User: <message>"
+        Cost -> case eventContent e of
+          Aeson.Object _ ->
+            let tokens = getField "tokens" (eventContent e)
+                cents = getField "cents" (eventContent e)
+             in "Cost: " <> tokens <> " tokens, " <> cents <> " cents"
+          _ -> "Cost: <info>"
+        Error -> case eventContent e of
+          Aeson.String t -> "Error: " <> t
+          _ -> "Error: <error>"
+        Complete -> "Complete"
+   in "[" <> ts <> "] " <> content
+  where
+    truncate' n t = if Text.length t > n then Text.take n t <> "..." else t
+    getField key val = case val of
+      Aeson.Object o -> case Aeson.lookup key (Aeson.toList o) of
+        Just (Aeson.String s) -> s
+        Just (Aeson.Number n) -> Text.pack (show n)
+        Just (Aeson.Bool b) -> if b then "ok" else "failed"
+        _ -> "<" <> key <> ">"
+      _ -> "<" <> key <> ">"
+      where
+        Aeson.lookup k pairs = snd </ find (\(k', _) -> k' == k) pairs
+        Aeson.toList (Aeson.Object o') = map (first' Aeson.toText) (Aeson.toList o')
+        Aeson.toList _ = []
+        Aeson.toText = id
+        first' f (a, b) = (f a, b)
+
+-- Helper constructors for common events
+
+mkAssistantEvent :: UTCTime -> Text -> AgentEvent
+mkAssistantEvent ts content =
+  AgentEvent
+    { eventType = Assistant,
+      eventTimestamp = ts,
+      eventContent = Aeson.String content
+    }
+
+mkToolCallEvent :: UTCTime -> Text -> Aeson.Value -> AgentEvent
+mkToolCallEvent ts toolName args =
+  AgentEvent
+    { eventType = ToolCall,
+      eventTimestamp = ts,
+      eventContent = Aeson.object ["tool" .= toolName, "args" .= args]
+    }
+
+mkToolResultEvent :: UTCTime -> Text -> Bool -> Text -> AgentEvent
+mkToolResultEvent ts toolName success output =
+  AgentEvent
+    { eventType = ToolResult,
+      eventTimestamp = ts,
+      eventContent =
+        Aeson.object
+          [ "tool" .= toolName,
+            "success" .= success,
+            "output" .= output
+          ]
+    }
+
+mkCostEvent :: UTCTime -> Int -> Int -> AgentEvent
+mkCostEvent ts tokens cents =
+  AgentEvent
+    { eventType = Cost,
+      eventTimestamp = ts,
+      eventContent = Aeson.object ["tokens" .= tokens, "cents" .= cents]
+    }
+
+mkErrorEvent :: UTCTime -> Text -> AgentEvent
+mkErrorEvent ts msg =
+  AgentEvent
+    { eventType = Error,
+      eventTimestamp = ts,
+      eventContent = Aeson.String msg
+    }
+
+mkCompleteEvent :: UTCTime -> AgentEvent
+mkCompleteEvent ts =
+  AgentEvent
+    { eventType = Complete,
+      eventTimestamp = ts,
+      eventContent = Aeson.Null
+    }
diff --git a/Omni/Agent/Worker.hs b/Omni/Agent/Worker.hs
index 61c392bb..1c69b153 100644
--- a/Omni/Agent/Worker.hs
+++ b/Omni/Agent/Worker.hs
@@ -243,6 +243,15 @@ runWithEngine repo task = do
       -- Select model based on task complexity (simple heuristic)
       let model = selectModel task
 
+      -- Generate session ID for event logging
+      sessionId <- TaskCore.generateSessionId
+      let tid = TaskCore.taskId task
+
+      -- Helper to log events to DB
+      let logEvent eventType content = do
+            let contentJson = TE.decodeUtf8 (BSL.toStrict (Aeson.encode content))
+            TaskCore.insertAgentEvent tid sessionId eventType contentJson
+
       -- Build Engine config with callbacks
       totalCostRef <- newIORef (0 :: Int)
       let engineCfg =
@@ -253,11 +262,26 @@ runWithEngine repo task = do
                     },
                 Engine.engineOnCost = \tokens cost -> do
                   modifyIORef' totalCostRef (+ cost)
-                  AgentLog.log <| "Cost: " <> tshow cost <> " cents (" <> tshow tokens <> " tokens)",
+                  AgentLog.log <| "Cost: " <> tshow cost <> " cents (" <> tshow tokens <> " tokens)"
+                  logEvent "cost" (Aeson.object [("tokens", Aeson.toJSON tokens), ("cents", Aeson.toJSON cost)]),
                 Engine.engineOnActivity = \activity -> do
                   AgentLog.log <| "[engine] " <> activity,
-                Engine.engineOnToolCall = \toolName result -> do
-                  AgentLog.log <| "[tool] " <> toolName <> ": " <> Text.take 100 result
+                Engine.engineOnToolCall = \toolName args -> do
+                  AgentLog.log <| "[tool] " <> toolName
+                  logEvent "tool_call" (Aeson.object [("tool", Aeson.toJSON toolName), ("args", Aeson.toJSON args)]),
+                Engine.engineOnAssistant = \msg -> do
+                  AgentLog.log <| "[assistant] " <> Text.take 200 msg
+                  logEvent "assistant" (Aeson.String msg),
+                Engine.engineOnToolResult = \toolName success output -> do
+                  let statusStr = if success then "ok" else "failed"
+                  AgentLog.log <| "[result] " <> toolName <> " (" <> statusStr <> "): " <> Text.take 100 output
+                  logEvent "tool_result" (Aeson.object [("tool", Aeson.toJSON toolName), ("success", Aeson.toJSON success), ("output", Aeson.toJSON output)]),
+                Engine.engineOnComplete = do
+                  AgentLog.log "[engine] Complete"
+                  logEvent "complete" Aeson.Null,
+                Engine.engineOnError = \err -> do
+                  AgentLog.log <| "[error] " <> err
+                  logEvent "error" (Aeson.String err)
               }
 
       -- Build Agent config
diff --git a/Omni/Task/Core.hs b/Omni/Task/Core.hs
index c930b2c9..699e4f3a 100644
--- a/Omni/Task/Core.hs
+++ b/Omni/Task/Core.hs
@@ -508,6 +508,23 @@ runMigrations conn = do
   migrateTable conn "tasks" tasksColumns
   migrateTable conn "retry_context" retryContextColumns
   migrateTable conn "facts" factsColumns
+  createAgentEventsTable conn
+
+-- | Create agent_events table if it doesn't exist
+createAgentEventsTable :: SQL.Connection -> IO ()
+createAgentEventsTable conn = do
+  SQL.execute_
+    conn
+    "CREATE TABLE IF NOT EXISTS agent_events (\
+    \ id INTEGER PRIMARY KEY AUTOINCREMENT, \
+    \ task_id TEXT NOT NULL, \
+    \ session_id TEXT NOT NULL, \
+    \ timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, \
+    \ event_type TEXT NOT NULL, \
+    \ content TEXT NOT NULL \
+    \)"
+  SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_agent_events_task ON agent_events(task_id)"
+  SQL.execute_ conn "CREATE INDEX IF NOT EXISTS idx_agent_events_session ON agent_events(session_id)"
 
 -- | Expected columns for task_activity table (name, type, nullable)
 taskActivityColumns :: [(Text, Text)]
@@ -1565,3 +1582,101 @@ deleteFact :: Int -> IO ()
 deleteFact fid =
   withDb <| \conn ->
     SQL.execute conn "DELETE FROM facts WHERE id = ?" (SQL.Only fid)
+
+-- ============================================================================
+-- Agent Events (for observability)
+-- ============================================================================
+
+-- | Stored agent event record
+data StoredEvent = StoredEvent
+  { storedEventId :: Int,
+    storedEventTaskId :: Text,
+    storedEventSessionId :: Text,
+    storedEventTimestamp :: UTCTime,
+    storedEventType :: Text,
+    storedEventContent :: Text
+  }
+  deriving (Show, Eq, Generic)
+
+instance ToJSON StoredEvent
+
+instance FromJSON StoredEvent
+
+instance SQL.FromRow StoredEvent where
+  fromRow =
+    StoredEvent
+      </ SQL.field
+      <*> SQL.field
+      <*> SQL.field
+      <*> SQL.field
+      <*> SQL.field
+      <*> SQL.field
+
+-- | Generate a new session ID (timestamp-based for simplicity)
+generateSessionId :: IO Text
+generateSessionId = do
+  now <- getCurrentTime
+  pure <| "s-" <> T.pack (show now)
+
+-- | Insert an agent event
+insertAgentEvent :: Text -> Text -> Text -> Text -> IO ()
+insertAgentEvent taskId sessionId eventType content =
+  withDb <| \conn ->
+    SQL.execute
+      conn
+      "INSERT INTO agent_events (task_id, session_id, event_type, content) VALUES (?, ?, ?, ?)"
+      (taskId, sessionId, eventType, content)
+
+-- | Get all events for a task (most recent session)
+getEventsForTask :: Text -> IO [StoredEvent]
+getEventsForTask taskId = do
+  maybeSession <- getLatestSessionForTask taskId
+  case maybeSession of
+    Nothing -> pure []
+    Just sid -> getEventsForSession sid
+
+-- | Get all events for a specific session
+getEventsForSession :: Text -> IO [StoredEvent]
+getEventsForSession sessionId =
+  withDb <| \conn ->
+    SQL.query
+      conn
+      "SELECT id, task_id, session_id, timestamp, event_type, content \
+      \FROM agent_events WHERE session_id = ? ORDER BY id ASC"
+      (SQL.Only sessionId)
+
+-- | Get all sessions for a task
+getSessionsForTask :: Text -> IO [Text]
+getSessionsForTask taskId =
+  withDb <| \conn -> do
+    rows <-
+      SQL.query
+        conn
+        "SELECT DISTINCT session_id FROM agent_events WHERE task_id = ? ORDER BY session_id DESC"
+        (SQL.Only taskId) ::
+        IO [SQL.Only Text]
+    pure [sid | SQL.Only sid <- rows]
+
+-- | Get the most recent session ID for a task
+getLatestSessionForTask :: Text -> IO (Maybe Text)
+getLatestSessionForTask taskId =
+  withDb <| \conn -> do
+    rows <-
+      SQL.query
+        conn
+        "SELECT session_id FROM agent_events WHERE task_id = ? ORDER BY id DESC LIMIT 1"
+        (SQL.Only taskId) ::
+        IO [SQL.Only Text]
+    pure <| case rows of
+      [SQL.Only sid] -> Just sid
+      _ -> Nothing
+
+-- | Get events for a task since a given event ID (for streaming/polling)
+getEventsSince :: Text -> Int -> IO [StoredEvent]
+getEventsSince sessionId lastId =
+  withDb <| \conn ->
+    SQL.query
+      conn
+      "SELECT id, task_id, session_id, timestamp, event_type, content \
+      \FROM agent_events WHERE session_id = ? AND id > ? ORDER BY id ASC"
+      (sessionId, lastId)