commit f9c95bb129c1ebf43175bb9a1da94ea130e8699b
Author: Coder Agent <coder@agents.omni>
Date: Thu Apr 16 03:59:01 2026
perf(agentd): use shared tail reader for persistent watch and status
Add Omni.Agentd.SessionLog for tail and incremental JSONL reads.
Switch persistent watch startup and top activity reads to tail-based I/O.
Use shared reader in daemon follow loops and add partial-line tests.
Task-Id: t-804
diff --git a/Omni/Agentd.hs b/Omni/Agentd.hs
index d579c818..df5d10e1 100755
--- a/Omni/Agentd.hs
+++ b/Omni/Agentd.hs
@@ -58,6 +58,7 @@ import qualified Omni.Agent.Trace as Trace
-- import qualified Omni.Agent.Op.Runner as OpRunner
import qualified Omni.Agent.Watch as Watch
import qualified Omni.Agentd.Daemon as Daemon
+import qualified Omni.Agentd.SessionLog as SessionLog
import qualified Omni.Agents.Summarize as AgentSummary
import qualified Omni.Test as Test
import qualified Options.Applicative as Opt
@@ -1346,6 +1347,9 @@ runPersistentWatch runIds details = do
persistentWatchStartupLines :: Int
persistentWatchStartupLines = 60
+persistentWatchStartupTailBytes :: Integer
+persistentWatchStartupTailBytes = 512 * 1024
+
persistentWatchPollMicros :: Int
persistentWatchPollMicros = 250000
@@ -1358,9 +1362,8 @@ streamPersistentWatchSession sessionsDir timezone details runId = do
TextIO.hPutStrLn stderr <| "No persistent session log found for " <> runId <> ": " <> Text.pack sessionPath
pure (Exit.ExitFailure 1)
else do
- initialChunk <- BS.readFile sessionPath
- let (initialLineBytes, initialOffset) = splitCompleteJsonlBytes initialChunk
- initialLines = map decodeJsonlLine initialLineBytes
+ (initialLineBytes, initialOffset) <- SessionLog.readTailJsonlLines sessionPath persistentWatchStartupTailBytes
+ let initialLines = map SessionLog.decodeJsonlLine initialLineBytes
startupLines = takeLast persistentWatchStartupLines initialLines
rendered = mapMaybe (formatPersistentWatchLine timezone details runId) startupLines
forM_ rendered TextIO.putStrLn
@@ -1377,37 +1380,12 @@ followPersistentWatchSession sessionPath timezone details runId = loop
TextIO.hPutStrLn stderr <| "Persistent session log disappeared: " <> Text.pack sessionPath
pure (Exit.ExitFailure 1)
else do
- fileSize <- Dir.getFileSize sessionPath
- let startOffset = if fileSize < offset then 0 else offset
- toRead = fromIntegral (max 0 (fileSize - startOffset))
- chunk <-
- IO.withFile sessionPath IO.ReadMode <| \h -> do
- IO.hSeek h IO.AbsoluteSeek startOffset
- BS.hGet h toRead
- let (lineBytes, processedBytes) = splitCompleteJsonlBytes chunk
- rendered = mapMaybe (formatPersistentWatchLine timezone details runId <. decodeJsonlLine) lineBytes
+ (lineBytes, nextOffset) <- SessionLog.readJsonlFromOffset sessionPath offset
+ let rendered = mapMaybe (formatPersistentWatchLine timezone details runId <. SessionLog.decodeJsonlLine) lineBytes
forM_ rendered TextIO.putStrLn
unless (null rendered) <| IO.hFlush IO.stdout
Concurrent.threadDelay persistentWatchPollMicros
- loop (startOffset + processedBytes)
-
-splitCompleteJsonlBytes :: BS.ByteString -> ([BS.ByteString], Integer)
-splitCompleteJsonlBytes chunk
- | BS.null chunk = ([], 0)
- | BS.last chunk == '\n' =
- let lines' = filter (not <. BS.null) (BS.split '\n' chunk)
- in (lines', fromIntegral (BS.length chunk))
- | otherwise =
- let segments = BS.split '\n' chunk
- complete = case reverse segments of
- [] -> []
- (_partial : restRev) -> reverse restRev
- lines' = filter (not <. BS.null) complete
- consumed = sum (map ((+ 1) <. BS.length) complete)
- in (lines', fromIntegral consumed)
-
-decodeJsonlLine :: BS.ByteString -> Text
-decodeJsonlLine = TE.decodeUtf8With TextEncodingError.lenientDecode
+ loop nextOffset
formatPersistentWatchLine :: Time.TimeZone -> Bool -> Text -> Text -> Maybe Text
formatPersistentWatchLine timezone details runId line =
@@ -3112,17 +3090,20 @@ persistentStatusLabel pa =
Daemon.Failed -> "Failed"
Daemon.Unknown -> "Unknown"
+persistentActivityTailBytes :: Integer
+persistentActivityTailBytes = 256 * 1024
+
readPersistentActivity :: FilePath -> IO (Maybe Text, [Text], [Text])
readPersistentActivity sessionPath = do
exists <- Dir.doesFileExist sessionPath
if not exists
then pure (Nothing, [], [])
else do
- contentResult <- try @SomeException <| TextIO.readFile sessionPath
- case contentResult of
+ linesResult <- try @SomeException <| SessionLog.readTailJsonlLines sessionPath persistentActivityTailBytes
+ case linesResult of
Left _ -> pure (Nothing, [], [])
- Right content -> do
- let lines' = takeLast 80 (completeJsonLines content)
+ Right (lineBytes, _nextOffset) -> do
+ let lines' = takeLast 80 (map SessionLog.decodeJsonlLine lineBytes)
labels = mapMaybe persistentLineLabel lines'
tools = mapMaybe persistentToolLabel lines'
pure (listToMaybe (reverse labels), labels, tools)
diff --git a/Omni/Agentd/Daemon.hs b/Omni/Agentd/Daemon.hs
index 6da2af2e..6c979766 100644
--- a/Omni/Agentd/Daemon.hs
+++ b/Omni/Agentd/Daemon.hs
@@ -85,6 +85,7 @@ import qualified Database.SQLite.Simple as SQL
import qualified Network.HTTP.Simple as HTTP
import qualified Omni.Agent.Models as Models
import qualified Omni.Agent.Trace as Trace
+import qualified Omni.Agentd.SessionLog as SessionLog
import qualified Omni.Agents.Summarize as Summarize
import qualified Omni.Test as Test
import qualified System.Directory as Dir
@@ -1036,22 +1037,6 @@ data SessionStatusSnapshot = SessionStatusSnapshot
}
deriving (Show, Eq)
-readTailBytes :: FilePath -> Integer -> IO (Bool, BS.ByteString)
-readTailBytes path maxBytes =
- IO.withFile path IO.ReadMode <| \h -> do
- fileSize <- IO.hFileSize h
- let bytesToRead = min fileSize maxBytes
- startOffset = fileSize - bytesToRead
- IO.hSeek h IO.AbsoluteSeek startOffset
- chunk <- BS.hGet h (fromIntegral bytesToRead)
- pure (startOffset > 0, chunk)
-
-dropLeadingPartialLine :: BS.ByteString -> BS.ByteString
-dropLeadingPartialLine chunk =
- case BS.elemIndex 10 chunk of
- Nothing -> BS.empty
- Just idx -> BS.drop (idx + 1) chunk
-
sessionStatusSnapshotFromLine :: BS.ByteString -> Maybe SessionStatusSnapshot
sessionStatusSnapshotFromLine lineBytes = do
obj <- sessionObjectFromJsonlBytes lineBytes
@@ -1083,9 +1068,7 @@ inferSessionStatusSnapshot runId = do
if not exists
then pure Nothing
else do
- (truncated, chunk) <- readTailBytes sessionPath sessionStatusTailBytes
- let alignedChunk = if truncated then dropLeadingPartialLine chunk else chunk
- lineBytes = filter (not <. BS.null) (BS.split 10 alignedChunk)
+ (lineBytes, _nextOffset) <- SessionLog.readTailJsonlLines sessionPath sessionStatusTailBytes
pure (inferSessionStatusFromLines lineBytes)
reconcilePersistentDbStatus :: Time.UTCTime -> AgentInfo -> Maybe SessionStatusSnapshot -> AgentStatus
@@ -1747,7 +1730,7 @@ selectLogLines filterOpts lines' =
Just n -> List.drop (max 0 (length matching - n)) matching
decodeLogLine :: BS.ByteString -> Text
-decodeLogLine = TextEncoding.decodeUtf8With TextEncodingError.lenientDecode
+decodeLogLine = SessionLog.decodeJsonlLine
followSessionLogFile :: FilePath -> ParsedLogFilterOptions -> Integer -> IO Exit.ExitCode
followSessionLogFile sessionPath filterOpts = loop
@@ -1759,19 +1742,13 @@ followSessionLogFile sessionPath filterOpts = loop
TextIO.hPutStrLn IO.stderr <| "Session log disappeared while following: " <> Text.pack sessionPath
pure (Exit.ExitFailure 1)
else do
- fileSize <- Dir.getFileSize sessionPath
- let startOffset = if fileSize < offset then 0 else offset
- chunk <-
- IO.withFile sessionPath IO.ReadMode <| \h -> do
- IO.hSeek h IO.AbsoluteSeek startOffset
- BS.hGetContents h
- let (lineBytes, processedBytes) = splitCompleteJsonlLines chunk
- linesText = map decodeLogLine lineBytes
+ (lineBytes, nextOffset) <- SessionLog.readJsonlFromOffset sessionPath offset
+ let linesText = map decodeLogLine lineBytes
forM_ linesText <| \line ->
when (lineMatchesLogFilter filterOpts line) <| TextIO.putStrLn line
unless (null linesText) <| IO.hFlush IO.stdout
threadDelay 250000
- loop (startOffset + processedBytes)
+ loop nextOffset
streamAgentLogs :: Text -> Bool -> LogFilterOptions -> IO Exit.ExitCode
streamAgentLogs runId follow filterOptions = do
@@ -1787,7 +1764,7 @@ streamAgentLogs runId follow filterOptions = do
pure (Exit.ExitFailure 1)
Right parsedFilter -> do
initialChunk <- IO.withFile sessionPath IO.ReadMode BS.hGetContents
- let (initialLineBytes, initialOffset) = splitCompleteJsonlLines initialChunk
+ let (initialLineBytes, initialOffset) = SessionLog.splitCompleteJsonlLines initialChunk
initialLines = map decodeLogLine initialLineBytes
selected = selectLogLines parsedFilter initialLines
if follow
@@ -1922,40 +1899,17 @@ processSessionFile maildir state (runId, path) = do
if not exists
then pure state
else do
- fileSize <- Dir.getFileSize path
let previousOffset = Map.findWithDefault 0 runId (nsOffsets state)
- startOffset = if previousOffset > fileSize then 0 else previousOffset
- toRead = fromIntegral (max 0 (fileSize - startOffset))
- chunk <-
- IO.withFile path IO.ReadMode <| \h -> do
- IO.hSeek h IO.AbsoluteSeek startOffset
- BS.hGet h toRead
- let (lineBytes, processedBytes) = splitCompleteJsonlLines chunk
- nextOffset = startOffset + processedBytes
+ (lineBytes, nextOffset) <- SessionLog.readJsonlFromOffset path previousOffset
stateAfterLines <- foldM (processSessionLine runId maildir) state lineBytes
pure
stateAfterLines
{ nsOffsets = Map.insert runId nextOffset (nsOffsets stateAfterLines)
}
-splitCompleteJsonlLines :: BS.ByteString -> ([BS.ByteString], Integer)
-splitCompleteJsonlLines chunk
- | BS.null chunk = ([], 0)
- | BS.last chunk == 10 =
- let lines' = filter (not <. BS.null) (BS.split 10 chunk)
- in (lines', fromIntegral (BS.length chunk))
- | otherwise =
- let segments = BS.split 10 chunk
- complete = case reverse segments of
- [] -> []
- (_partial : restRev) -> reverse restRev
- lines' = filter (not <. BS.null) complete
- consumed = sum (map ((+ 1) <. BS.length) complete)
- in (lines', fromIntegral consumed)
-
processSessionLine :: Text -> FilePath -> NotifyState -> BS.ByteString -> IO NotifyState
processSessionLine runId maildir state lineBytes =
- let line = TextEncoding.decodeUtf8With TextEncodingError.lenientDecode lineBytes
+ let line = SessionLog.decodeJsonlLine lineBytes
in case parseNotifyEventLine line of
Nothing -> pure state
Just event -> deliverNotifyEvent maildir runId state event
@@ -2687,6 +2641,21 @@ runDaemon _ = do
-- * Tests
+withTempJsonlFile :: BS.ByteString -> (FilePath -> IO a) -> IO a
+withTempJsonlFile bytes action = do
+ tmpRoot <- Dir.getTemporaryDirectory
+ uuid <- UUID.toText </ UUID.nextRandom
+ let path = tmpRoot </> "agentd-session-tail-" <> Text.unpack (Text.take 8 uuid) <> ".jsonl"
+ cleanup = do
+ exists <- Dir.doesFileExist path
+ when exists <| Dir.removeFile path
+ Exception.finally
+ ( do
+ BS.writeFile path bytes
+ action path
+ )
+ cleanup
+
main :: IO ()
main = Test.run test
@@ -2960,6 +2929,17 @@ test =
case compileLogFilterOptions defaultLogFilterOptions {lfoType = Just "result"} of
Left err -> Test.assertFailure (Text.unpack err)
Right parsed -> selectLogLines parsed lines' Test.@=? [List.last lines'],
+ Test.unit "session log splitter drops trailing partial line" <| do
+ let chunk = TextEncoding.encodeUtf8 "{\"type\":\"a\"}\n{\"type\":\"b\"}\n{\"type\":\"partial\""
+ expectedConsumed = fromIntegral (BS.length (TextEncoding.encodeUtf8 "{\"type\":\"a\"}\n{\"type\":\"b\"}\n"))
+ (lines', consumed) = SessionLog.splitCompleteJsonlLines chunk
+ length lines' Test.@=? 2
+ consumed Test.@=? expectedConsumed,
+ Test.unit "tail reader drops leading partial line from truncated chunk" <| do
+ let content = TextEncoding.encodeUtf8 "1234567890\nabc\nxyz\n"
+ withTempJsonlFile content <| \path -> do
+ (lines', _nextOffset) <- SessionLog.readTailJsonlLines path 7
+ map SessionLog.decodeJsonlLine lines' Test.@=? ["xyz"],
Test.unit "log filter rejects invalid --since timestamps" <| do
case compileLogFilterOptions defaultLogFilterOptions {lfoSince = Just "not-a-time"} of
Left _ -> pure ()
diff --git a/Omni/Agentd/SessionLog.hs b/Omni/Agentd/SessionLog.hs
new file mode 100644
index 00000000..556961aa
--- /dev/null
+++ b/Omni/Agentd/SessionLog.hs
@@ -0,0 +1,85 @@
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Shared JSONL tailing helpers for persistent agent session logs.
+module Omni.Agentd.SessionLog
+ ( readTailJsonlLines,
+ readJsonlFromOffset,
+ splitCompleteJsonlLines,
+ decodeJsonlLine,
+ )
+where
+
+import Alpha
+import qualified Data.ByteString as BS
+import qualified Data.Text.Encoding as TextEncoding
+import qualified Data.Text.Encoding.Error as TextEncodingError
+import qualified System.Directory as Dir
+import qualified System.IO as IO
+
+-- | Read up to @maxBytes@ from the end of a JSONL file.
+--
+-- Returns complete JSONL lines plus the next file offset for follow loops.
+readTailJsonlLines :: FilePath -> Integer -> IO ([BS.ByteString], Integer)
+readTailJsonlLines path maxBytes =
+ IO.withFile path IO.ReadMode <| \h -> do
+ fileSize <- IO.hFileSize h
+ let bytesToRead = min fileSize maxBytes
+ startOffset = fileSize - bytesToRead
+ IO.hSeek h IO.AbsoluteSeek startOffset
+ chunk <- BS.hGet h (fromIntegral bytesToRead)
+ let (alignedChunk, droppedBytes) =
+ if startOffset > 0
+ then dropLeadingPartialLine chunk
+ else (chunk, 0)
+ (lineBytes, processedBytes) = splitCompleteJsonlLines alignedChunk
+ nextOffset = startOffset + droppedBytes + processedBytes
+ pure (lineBytes, nextOffset)
+
+-- | Read complete JSONL lines from @offset@ to EOF.
+--
+-- Returns complete lines plus the next offset to continue following.
+readJsonlFromOffset :: FilePath -> Integer -> IO ([BS.ByteString], Integer)
+readJsonlFromOffset path offset = do
+ fileSize <- Dir.getFileSize path
+ let startOffset = if fileSize < offset then 0 else offset
+ toRead = fromIntegral (max 0 (fileSize - startOffset))
+ chunk <-
+ IO.withFile path IO.ReadMode <| \h -> do
+ IO.hSeek h IO.AbsoluteSeek startOffset
+ BS.hGet h toRead
+ let (lineBytes, processedBytes) = splitCompleteJsonlLines chunk
+ pure (lineBytes, startOffset + processedBytes)
+
+-- | Split a chunk into complete JSONL lines and consumed byte count.
+--
+-- If the chunk ends in a partial line, that suffix is excluded and must be
+-- carried over by the caller through the returned offset.
+splitCompleteJsonlLines :: BS.ByteString -> ([BS.ByteString], Integer)
+splitCompleteJsonlLines chunk
+ | BS.null chunk = ([], 0)
+ | BS.last chunk == 10 =
+ let lines' = filter (not <. BS.null) (BS.split 10 chunk)
+ in (lines', fromIntegral (BS.length chunk))
+ | otherwise =
+ let segments = BS.split 10 chunk
+ complete = case reverse segments of
+ [] -> []
+ (_partial : restRev) -> reverse restRev
+ lines' = filter (not <. BS.null) complete
+ consumed = sum (map ((+ 1) <. BS.length) complete)
+ in (lines', fromIntegral consumed)
+
+-- | Decode a JSONL line with lenient UTF-8 handling.
+decodeJsonlLine :: BS.ByteString -> Text
+decodeJsonlLine = TextEncoding.decodeUtf8With TextEncodingError.lenientDecode
+
+-- | Drop a potentially-partial first line from a tail chunk.
+--
+-- Returns (aligned chunk, dropped byte count).
+dropLeadingPartialLine :: BS.ByteString -> (BS.ByteString, Integer)
+dropLeadingPartialLine chunk =
+ case BS.elemIndex 10 chunk of
+ Nothing -> (BS.empty, fromIntegral (BS.length chunk))
+ Just idx ->
+ let dropped = idx + 1
+ in (BS.drop dropped chunk, fromIntegral dropped)