← Back to task

Commit f9c95bb1

commit f9c95bb129c1ebf43175bb9a1da94ea130e8699b
Author: Coder Agent <coder@agents.omni>
Date:   Thu Apr 16 03:59:01 2026

    perf(agentd): use shared tail reader for persistent watch and status
    
    Add Omni.Agentd.SessionLog for tail and incremental JSONL reads.
    
    Switch persistent watch startup and top activity reads to tail-based I/O.
    
    Use shared reader in daemon follow loops and add partial-line tests.
    
    Task-Id: t-804

diff --git a/Omni/Agentd.hs b/Omni/Agentd.hs
index d579c818..df5d10e1 100755
--- a/Omni/Agentd.hs
+++ b/Omni/Agentd.hs
@@ -58,6 +58,7 @@ import qualified Omni.Agent.Trace as Trace
 -- import qualified Omni.Agent.Op.Runner as OpRunner
 import qualified Omni.Agent.Watch as Watch
 import qualified Omni.Agentd.Daemon as Daemon
+import qualified Omni.Agentd.SessionLog as SessionLog
 import qualified Omni.Agents.Summarize as AgentSummary
 import qualified Omni.Test as Test
 import qualified Options.Applicative as Opt
@@ -1346,6 +1347,9 @@ runPersistentWatch runIds details = do
 persistentWatchStartupLines :: Int
 persistentWatchStartupLines = 60
 
+persistentWatchStartupTailBytes :: Integer
+persistentWatchStartupTailBytes = 512 * 1024
+
 persistentWatchPollMicros :: Int
 persistentWatchPollMicros = 250000
 
@@ -1358,9 +1362,8 @@ streamPersistentWatchSession sessionsDir timezone details runId = do
       TextIO.hPutStrLn stderr <| "No persistent session log found for " <> runId <> ": " <> Text.pack sessionPath
       pure (Exit.ExitFailure 1)
     else do
-      initialChunk <- BS.readFile sessionPath
-      let (initialLineBytes, initialOffset) = splitCompleteJsonlBytes initialChunk
-          initialLines = map decodeJsonlLine initialLineBytes
+      (initialLineBytes, initialOffset) <- SessionLog.readTailJsonlLines sessionPath persistentWatchStartupTailBytes
+      let initialLines = map SessionLog.decodeJsonlLine initialLineBytes
           startupLines = takeLast persistentWatchStartupLines initialLines
           rendered = mapMaybe (formatPersistentWatchLine timezone details runId) startupLines
       forM_ rendered TextIO.putStrLn
@@ -1377,37 +1380,12 @@ followPersistentWatchSession sessionPath timezone details runId = loop
           TextIO.hPutStrLn stderr <| "Persistent session log disappeared: " <> Text.pack sessionPath
           pure (Exit.ExitFailure 1)
         else do
-          fileSize <- Dir.getFileSize sessionPath
-          let startOffset = if fileSize < offset then 0 else offset
-              toRead = fromIntegral (max 0 (fileSize - startOffset))
-          chunk <-
-            IO.withFile sessionPath IO.ReadMode <| \h -> do
-              IO.hSeek h IO.AbsoluteSeek startOffset
-              BS.hGet h toRead
-          let (lineBytes, processedBytes) = splitCompleteJsonlBytes chunk
-              rendered = mapMaybe (formatPersistentWatchLine timezone details runId <. decodeJsonlLine) lineBytes
+          (lineBytes, nextOffset) <- SessionLog.readJsonlFromOffset sessionPath offset
+          let rendered = mapMaybe (formatPersistentWatchLine timezone details runId <. SessionLog.decodeJsonlLine) lineBytes
           forM_ rendered TextIO.putStrLn
           unless (null rendered) <| IO.hFlush IO.stdout
           Concurrent.threadDelay persistentWatchPollMicros
-          loop (startOffset + processedBytes)
-
-splitCompleteJsonlBytes :: BS.ByteString -> ([BS.ByteString], Integer)
-splitCompleteJsonlBytes chunk
-  | BS.null chunk = ([], 0)
-  | BS.last chunk == '\n' =
-      let lines' = filter (not <. BS.null) (BS.split '\n' chunk)
-       in (lines', fromIntegral (BS.length chunk))
-  | otherwise =
-      let segments = BS.split '\n' chunk
-          complete = case reverse segments of
-            [] -> []
-            (_partial : restRev) -> reverse restRev
-          lines' = filter (not <. BS.null) complete
-          consumed = sum (map ((+ 1) <. BS.length) complete)
-       in (lines', fromIntegral consumed)
-
-decodeJsonlLine :: BS.ByteString -> Text
-decodeJsonlLine = TE.decodeUtf8With TextEncodingError.lenientDecode
+          loop nextOffset
 
 formatPersistentWatchLine :: Time.TimeZone -> Bool -> Text -> Text -> Maybe Text
 formatPersistentWatchLine timezone details runId line =
@@ -3112,17 +3090,20 @@ persistentStatusLabel pa =
       Daemon.Failed -> "Failed"
       Daemon.Unknown -> "Unknown"
 
+persistentActivityTailBytes :: Integer
+persistentActivityTailBytes = 256 * 1024
+
 readPersistentActivity :: FilePath -> IO (Maybe Text, [Text], [Text])
 readPersistentActivity sessionPath = do
   exists <- Dir.doesFileExist sessionPath
   if not exists
     then pure (Nothing, [], [])
     else do
-      contentResult <- try @SomeException <| TextIO.readFile sessionPath
-      case contentResult of
+      linesResult <- try @SomeException <| SessionLog.readTailJsonlLines sessionPath persistentActivityTailBytes
+      case linesResult of
         Left _ -> pure (Nothing, [], [])
-        Right content -> do
-          let lines' = takeLast 80 (completeJsonLines content)
+        Right (lineBytes, _nextOffset) -> do
+          let lines' = takeLast 80 (map SessionLog.decodeJsonlLine lineBytes)
               labels = mapMaybe persistentLineLabel lines'
               tools = mapMaybe persistentToolLabel lines'
           pure (listToMaybe (reverse labels), labels, tools)
diff --git a/Omni/Agentd/Daemon.hs b/Omni/Agentd/Daemon.hs
index 6da2af2e..6c979766 100644
--- a/Omni/Agentd/Daemon.hs
+++ b/Omni/Agentd/Daemon.hs
@@ -85,6 +85,7 @@ import qualified Database.SQLite.Simple as SQL
 import qualified Network.HTTP.Simple as HTTP
 import qualified Omni.Agent.Models as Models
 import qualified Omni.Agent.Trace as Trace
+import qualified Omni.Agentd.SessionLog as SessionLog
 import qualified Omni.Agents.Summarize as Summarize
 import qualified Omni.Test as Test
 import qualified System.Directory as Dir
@@ -1036,22 +1037,6 @@ data SessionStatusSnapshot = SessionStatusSnapshot
   }
   deriving (Show, Eq)
 
-readTailBytes :: FilePath -> Integer -> IO (Bool, BS.ByteString)
-readTailBytes path maxBytes =
-  IO.withFile path IO.ReadMode <| \h -> do
-    fileSize <- IO.hFileSize h
-    let bytesToRead = min fileSize maxBytes
-        startOffset = fileSize - bytesToRead
-    IO.hSeek h IO.AbsoluteSeek startOffset
-    chunk <- BS.hGet h (fromIntegral bytesToRead)
-    pure (startOffset > 0, chunk)
-
-dropLeadingPartialLine :: BS.ByteString -> BS.ByteString
-dropLeadingPartialLine chunk =
-  case BS.elemIndex 10 chunk of
-    Nothing -> BS.empty
-    Just idx -> BS.drop (idx + 1) chunk
-
 sessionStatusSnapshotFromLine :: BS.ByteString -> Maybe SessionStatusSnapshot
 sessionStatusSnapshotFromLine lineBytes = do
   obj <- sessionObjectFromJsonlBytes lineBytes
@@ -1083,9 +1068,7 @@ inferSessionStatusSnapshot runId = do
   if not exists
     then pure Nothing
     else do
-      (truncated, chunk) <- readTailBytes sessionPath sessionStatusTailBytes
-      let alignedChunk = if truncated then dropLeadingPartialLine chunk else chunk
-          lineBytes = filter (not <. BS.null) (BS.split 10 alignedChunk)
+      (lineBytes, _nextOffset) <- SessionLog.readTailJsonlLines sessionPath sessionStatusTailBytes
       pure (inferSessionStatusFromLines lineBytes)
 
 reconcilePersistentDbStatus :: Time.UTCTime -> AgentInfo -> Maybe SessionStatusSnapshot -> AgentStatus
@@ -1747,7 +1730,7 @@ selectLogLines filterOpts lines' =
         Just n -> List.drop (max 0 (length matching - n)) matching
 
 decodeLogLine :: BS.ByteString -> Text
-decodeLogLine = TextEncoding.decodeUtf8With TextEncodingError.lenientDecode
+decodeLogLine = SessionLog.decodeJsonlLine
 
 followSessionLogFile :: FilePath -> ParsedLogFilterOptions -> Integer -> IO Exit.ExitCode
 followSessionLogFile sessionPath filterOpts = loop
@@ -1759,19 +1742,13 @@ followSessionLogFile sessionPath filterOpts = loop
           TextIO.hPutStrLn IO.stderr <| "Session log disappeared while following: " <> Text.pack sessionPath
           pure (Exit.ExitFailure 1)
         else do
-          fileSize <- Dir.getFileSize sessionPath
-          let startOffset = if fileSize < offset then 0 else offset
-          chunk <-
-            IO.withFile sessionPath IO.ReadMode <| \h -> do
-              IO.hSeek h IO.AbsoluteSeek startOffset
-              BS.hGetContents h
-          let (lineBytes, processedBytes) = splitCompleteJsonlLines chunk
-              linesText = map decodeLogLine lineBytes
+          (lineBytes, nextOffset) <- SessionLog.readJsonlFromOffset sessionPath offset
+          let linesText = map decodeLogLine lineBytes
           forM_ linesText <| \line ->
             when (lineMatchesLogFilter filterOpts line) <| TextIO.putStrLn line
           unless (null linesText) <| IO.hFlush IO.stdout
           threadDelay 250000
-          loop (startOffset + processedBytes)
+          loop nextOffset
 
 streamAgentLogs :: Text -> Bool -> LogFilterOptions -> IO Exit.ExitCode
 streamAgentLogs runId follow filterOptions = do
@@ -1787,7 +1764,7 @@ streamAgentLogs runId follow filterOptions = do
         pure (Exit.ExitFailure 1)
       Right parsedFilter -> do
         initialChunk <- IO.withFile sessionPath IO.ReadMode BS.hGetContents
-        let (initialLineBytes, initialOffset) = splitCompleteJsonlLines initialChunk
+        let (initialLineBytes, initialOffset) = SessionLog.splitCompleteJsonlLines initialChunk
             initialLines = map decodeLogLine initialLineBytes
             selected = selectLogLines parsedFilter initialLines
         if follow
@@ -1922,40 +1899,17 @@ processSessionFile maildir state (runId, path) = do
   if not exists
     then pure state
     else do
-      fileSize <- Dir.getFileSize path
       let previousOffset = Map.findWithDefault 0 runId (nsOffsets state)
-          startOffset = if previousOffset > fileSize then 0 else previousOffset
-          toRead = fromIntegral (max 0 (fileSize - startOffset))
-      chunk <-
-        IO.withFile path IO.ReadMode <| \h -> do
-          IO.hSeek h IO.AbsoluteSeek startOffset
-          BS.hGet h toRead
-      let (lineBytes, processedBytes) = splitCompleteJsonlLines chunk
-          nextOffset = startOffset + processedBytes
+      (lineBytes, nextOffset) <- SessionLog.readJsonlFromOffset path previousOffset
       stateAfterLines <- foldM (processSessionLine runId maildir) state lineBytes
       pure
         stateAfterLines
           { nsOffsets = Map.insert runId nextOffset (nsOffsets stateAfterLines)
           }
 
-splitCompleteJsonlLines :: BS.ByteString -> ([BS.ByteString], Integer)
-splitCompleteJsonlLines chunk
-  | BS.null chunk = ([], 0)
-  | BS.last chunk == 10 =
-      let lines' = filter (not <. BS.null) (BS.split 10 chunk)
-       in (lines', fromIntegral (BS.length chunk))
-  | otherwise =
-      let segments = BS.split 10 chunk
-          complete = case reverse segments of
-            [] -> []
-            (_partial : restRev) -> reverse restRev
-          lines' = filter (not <. BS.null) complete
-          consumed = sum (map ((+ 1) <. BS.length) complete)
-       in (lines', fromIntegral consumed)
-
 processSessionLine :: Text -> FilePath -> NotifyState -> BS.ByteString -> IO NotifyState
 processSessionLine runId maildir state lineBytes =
-  let line = TextEncoding.decodeUtf8With TextEncodingError.lenientDecode lineBytes
+  let line = SessionLog.decodeJsonlLine lineBytes
    in case parseNotifyEventLine line of
         Nothing -> pure state
         Just event -> deliverNotifyEvent maildir runId state event
@@ -2687,6 +2641,21 @@ runDaemon _ = do
 
 -- * Tests
 
+withTempJsonlFile :: BS.ByteString -> (FilePath -> IO a) -> IO a
+withTempJsonlFile bytes action = do
+  tmpRoot <- Dir.getTemporaryDirectory
+  uuid <- UUID.toText </ UUID.nextRandom
+  let path = tmpRoot </> "agentd-session-tail-" <> Text.unpack (Text.take 8 uuid) <> ".jsonl"
+      cleanup = do
+        exists <- Dir.doesFileExist path
+        when exists <| Dir.removeFile path
+  Exception.finally
+    ( do
+        BS.writeFile path bytes
+        action path
+    )
+    cleanup
+
 main :: IO ()
 main = Test.run test
 
@@ -2960,6 +2929,17 @@ test =
         case compileLogFilterOptions defaultLogFilterOptions {lfoType = Just "result"} of
           Left err -> Test.assertFailure (Text.unpack err)
           Right parsed -> selectLogLines parsed lines' Test.@=? [List.last lines'],
+      Test.unit "session log splitter drops trailing partial line" <| do
+        let chunk = TextEncoding.encodeUtf8 "{\"type\":\"a\"}\n{\"type\":\"b\"}\n{\"type\":\"partial\""
+            expectedConsumed = fromIntegral (BS.length (TextEncoding.encodeUtf8 "{\"type\":\"a\"}\n{\"type\":\"b\"}\n"))
+            (lines', consumed) = SessionLog.splitCompleteJsonlLines chunk
+        length lines' Test.@=? 2
+        consumed Test.@=? expectedConsumed,
+      Test.unit "tail reader drops leading partial line from truncated chunk" <| do
+        let content = TextEncoding.encodeUtf8 "1234567890\nabc\nxyz\n"
+        withTempJsonlFile content <| \path -> do
+          (lines', _nextOffset) <- SessionLog.readTailJsonlLines path 7
+          map SessionLog.decodeJsonlLine lines' Test.@=? ["xyz"],
       Test.unit "log filter rejects invalid --since timestamps" <| do
         case compileLogFilterOptions defaultLogFilterOptions {lfoSince = Just "not-a-time"} of
           Left _ -> pure ()
diff --git a/Omni/Agentd/SessionLog.hs b/Omni/Agentd/SessionLog.hs
new file mode 100644
index 00000000..556961aa
--- /dev/null
+++ b/Omni/Agentd/SessionLog.hs
@@ -0,0 +1,85 @@
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Shared JSONL tailing helpers for persistent agent session logs.
+module Omni.Agentd.SessionLog
+  ( readTailJsonlLines,
+    readJsonlFromOffset,
+    splitCompleteJsonlLines,
+    decodeJsonlLine,
+  )
+where
+
+import Alpha
+import qualified Data.ByteString as BS
+import qualified Data.Text.Encoding as TextEncoding
+import qualified Data.Text.Encoding.Error as TextEncodingError
+import qualified System.Directory as Dir
+import qualified System.IO as IO
+
+-- | Read up to @maxBytes@ from the end of a JSONL file.
+--
+-- Returns complete JSONL lines plus the next file offset for follow loops.
+readTailJsonlLines :: FilePath -> Integer -> IO ([BS.ByteString], Integer)
+readTailJsonlLines path maxBytes =
+  IO.withFile path IO.ReadMode <| \h -> do
+    fileSize <- IO.hFileSize h
+    let bytesToRead = min fileSize maxBytes
+        startOffset = fileSize - bytesToRead
+    IO.hSeek h IO.AbsoluteSeek startOffset
+    chunk <- BS.hGet h (fromIntegral bytesToRead)
+    let (alignedChunk, droppedBytes) =
+          if startOffset > 0
+            then dropLeadingPartialLine chunk
+            else (chunk, 0)
+        (lineBytes, processedBytes) = splitCompleteJsonlLines alignedChunk
+        nextOffset = startOffset + droppedBytes + processedBytes
+    pure (lineBytes, nextOffset)
+
+-- | Read complete JSONL lines from @offset@ to EOF.
+--
+-- Returns complete lines plus the next offset to continue following.
+readJsonlFromOffset :: FilePath -> Integer -> IO ([BS.ByteString], Integer)
+readJsonlFromOffset path offset = do
+  fileSize <- Dir.getFileSize path
+  let startOffset = if fileSize < offset then 0 else offset
+      toRead = fromIntegral (max 0 (fileSize - startOffset))
+  chunk <-
+    IO.withFile path IO.ReadMode <| \h -> do
+      IO.hSeek h IO.AbsoluteSeek startOffset
+      BS.hGet h toRead
+  let (lineBytes, processedBytes) = splitCompleteJsonlLines chunk
+  pure (lineBytes, startOffset + processedBytes)
+
+-- | Split a chunk into complete JSONL lines and consumed byte count.
+--
+-- If the chunk ends in a partial line, that suffix is excluded and must be
+-- carried over by the caller through the returned offset.
+splitCompleteJsonlLines :: BS.ByteString -> ([BS.ByteString], Integer)
+splitCompleteJsonlLines chunk
+  | BS.null chunk = ([], 0)
+  | BS.last chunk == 10 =
+      let lines' = filter (not <. BS.null) (BS.split 10 chunk)
+       in (lines', fromIntegral (BS.length chunk))
+  | otherwise =
+      let segments = BS.split 10 chunk
+          complete = case reverse segments of
+            [] -> []
+            (_partial : restRev) -> reverse restRev
+          lines' = filter (not <. BS.null) complete
+          consumed = sum (map ((+ 1) <. BS.length) complete)
+       in (lines', fromIntegral consumed)
+
+-- | Decode a JSONL line with lenient UTF-8 handling.
+decodeJsonlLine :: BS.ByteString -> Text
+decodeJsonlLine = TextEncoding.decodeUtf8With TextEncodingError.lenientDecode
+
+-- | Drop a potentially-partial first line from a tail chunk.
+--
+-- Returns (aligned chunk, dropped byte count).
+dropLeadingPartialLine :: BS.ByteString -> (BS.ByteString, Integer)
+dropLeadingPartialLine chunk =
+  case BS.elemIndex 10 chunk of
+    Nothing -> (BS.empty, fromIntegral (BS.length chunk))
+    Just idx ->
+      let dropped = idx + 1
+       in (BS.drop dropped chunk, fromIntegral dropped)