Split Biz/PodcastItLater/Worker.py (2199 lines) into smaller modules.
Important: keep tests nearby the code that it is testing. For example: Feature class and TestFeature test cases should be in the same file.
Create these new files in Biz/PodcastItLater/:
Move the ArticleProcessor class and its methods (lines 103-720):
Move text processing functions:
Move job management functions:
1. Create the new files with proper imports 2. Update Worker.py to import from the new modules 3. Run: bild --test Biz/PodcastItLater/Worker.py 4. Verify all tests pass
I'll start by exploring the current Worker.py file to understand its structure and dependencies.
Biz/PodcastItLater/Worker.py @1-1001: """Background worker for processing article-to-podcast conversions."""
2:
3: # : dep boto3
4: # : dep botocore
5: # : dep openai
6: # : dep psutil
7: # : dep pydub
8: # : dep pytest
9: # : dep pytest-asyncio
10: # : dep pytest-mock
11: # : dep trafilatura
12: # : out podcastitlater-worker
13: # : run ffmpeg
14: import Biz.PodcastItLater.Core as Core
15: import boto3 # type: ignore[import-untyped]
16: import concurrent.futures
17: import io
18: import json
19: import logging
20: import Omni.App as App
21: import Omni.Log as Log
22: import Omni.Test as Test
23: import openai
24: import operator
25: import os
26: import psutil # type: ignore[import-untyped]
27: import pytest
28: import signal
29: import sys
30: import tempfile
31: import threading
32: import time
33: import trafilatura
34: import typing
35: import unittest.mock
36: from botocore.exceptions import ClientError # type: ignore[import-untyped]
37: from datetime import datetime
38: from datetime import timedelta
39: from datetime import timezone
40: from pathlib import Path
41: from pydub import AudioSegment # type: ignore[import-untyped]
42: from typing import Any
43:
44: logger = logging.getLogger(__name__)
45: Log.setup(logger)
46:
47: # Configuration from environment variables
48: OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
49: S3_ENDPOINT = os.getenv("S3_ENDPOINT")
50: S3_BUCKET = os.getenv("S3_BUCKET")
51: S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
52: S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
53: area = App.from_env()
54:
55: # Worker configuration
56: MAX_CONTENT_LENGTH = 5000 # characters for TTS
57: MAX_ARTICLE_SIZE = 500_000 # 500KB character limit for articles
58: POLL_INTERVAL = 30 # seconds
59: MAX_RETRIES = 3
60: TTS_MODEL = "tts-1"
61: TTS_VOICE = "alloy"
62: MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage
63: CROSSFADE_DURATION = 500 # ms for crossfading segments
64: PAUSE_DURATION = 1000 # ms for silence between segments
65:
66:
67: class ShutdownHandler:
68: """Handles graceful shutdown of the worker."""
69:
70: def __init__(self) -> None:
71: """Initialize shutdown handler."""
72: self.shutdown_requested = threading.Event()
73: self.current_job_id: int | None = None
74: self.lock = threading.Lock()
75:
76: # Register signal handlers
77: signal.signal(signal.SIGTERM, self._handle_signal)
78: signal.signal(signal.SIGINT, self._handle_signal)
79:
80: def _handle_signal(self, signum: int, _frame: Any) -> None:
81: """Handle shutdown signals."""
82: logger.info(
83: "Received signal %d, initiating graceful shutdown...",
84: signum,
85: )
86: self.shutdown_requested.set()
87:
88: def is_shutdown_requested(self) -> bool:
89: """Check if shutdown has been requested."""
90: return self.shutdown_requested.is_set()
91:
92: def set_current_job(self, job_id: int | None) -> None:
93: """Set the currently processing job."""
94: with self.lock:
95: self.current_job_id = job_id
96:
97: def get_current_job(self) -> int | None:
98: """Get the currently processing job."""
99: with self.lock:
100: return self.current_job_id
Biz/PodcastItLater/Worker.py @100-200100: return self.current_job_id
101:
102:
103: class ArticleProcessor:
104: """Handles the complete article-to-podcast conversion pipeline."""
105:
106: def __init__(self, shutdown_handler: ShutdownHandler) -> None:
107: """Initialize the processor with required services.
108:
109: Raises:
110: ValueError: If OPENAI_API_KEY environment variable is not set.
111: """
112: if not OPENAI_API_KEY:
113: msg = "OPENAI_API_KEY environment variable is required"
114: raise ValueError(msg)
115:
116: self.openai_client: openai.OpenAI = openai.OpenAI(
117: api_key=OPENAI_API_KEY,
118: )
119: self.shutdown_handler = shutdown_handler
120:
121: # Initialize S3 client for Digital Ocean Spaces
122: if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]):
123: self.s3_client: Any = boto3.client(
124: "s3",
125: endpoint_url=S3_ENDPOINT,
126: aws_access_key_id=S3_ACCESS_KEY,
127: aws_secret_access_key=S3_SECRET_KEY,
128: )
129: else:
130: logger.warning("S3 configuration incomplete, uploads will fail")
131: self.s3_client = None
132:
133: @staticmethod
134: def extract_article_content(
135: url: str,
136: ) -> tuple[str, str, str | None, str | None]:
137: """Extract title, content, author, and date from article URL.
138:
139: Returns:
140: tuple: (title, content, author, publication_date)
141:
142: Raises:
143: ValueError: If content cannot be downloaded, extracted, or large.
144: """
145: try:
146: downloaded = trafilatura.fetch_url(url)
147: if not downloaded:
148: msg = f"Failed to download content from {url}"
149: raise ValueError(msg) # noqa: TRY301
150:
151: # Check size before processing
152: if (
153: len(downloaded) > MAX_ARTICLE_SIZE * 4
154: ): # Rough HTML to text ratio
155: msg = f"Article too large: {len(downloaded)} bytes"
156: raise ValueError(msg) # noqa: TRY301
157:
158: # Extract with metadata
159: result = trafilatura.extract(
160: downloaded,
161: include_comments=False,
162: include_tables=False,
163: with_metadata=True,
164: output_format="json",
165: )
166:
167: if not result:
168: msg = f"Failed to extract content from {url}"
169: raise ValueError(msg) # noqa: TRY301
170:
171: data = json.loads(result)
172:
173: title = data.get("title", "Untitled Article")
174: content = data.get("text", "")
175: author = data.get("author")
176: pub_date = data.get("date")
177:
178: if not content:
179: msg = f"No content extracted from {url}"
180: raise ValueError(msg) # noqa: TRY301
181:
182: # Enforce content size limit
183: if len(content) > MAX_ARTICLE_SIZE:
184: logger.warning(
185: "Article content truncated from %d to %d characters",
186: len(content),
187: MAX_ARTICLE_SIZE,
188: )
189: content = content[:MAX_ARTICLE_SIZE]
190:
191: logger.info(
192: "Extracted article: %s (%d chars, author: %s, date: %s)",
193: title,
194: len(content),
195: author or "unknown",
196: pub_date or "unknown",
197: )
198: except Exception:
199: logger.exception("Failed to extract content from %s", url)
200: raise
Let me continue reading to understand the full structure:
^class Test./Omni/Repl.py-212- def edit(self) -> None:
./Omni/Repl.py-213- """Edit the current namespace."""
./Omni/Repl.py-214- edit_file(self.ns, self.path, self.editor)
./Omni/Repl.py-215-
./Omni/Repl.py-216-
./Omni/Repl.py:217:class TestCustomRepl(unittest.TestCase):
./Omni/Repl.py-218- """Test the CustomRepl functionality."""
./Omni/Repl.py-219-
./Omni/Repl.py-220- def setUp(self) -> None:
./Omni/Repl.py-221- """Create a CustomRepl for testing."""
./Omni/Repl.py-222- ns = __name__
--
./Omni/Llamacpp.py-16-import os
./Omni/Llamacpp.py-17-import sys
./Omni/Llamacpp.py-18-import unittest
./Omni/Llamacpp.py-19-
./Omni/Llamacpp.py-20-
./Omni/Llamacpp.py:21:class TestLlamaCpp(unittest.TestCase):
./Omni/Llamacpp.py-22- """Test that llama.cpp is available."""
./Omni/Llamacpp.py-23-
./Omni/Llamacpp.py-24- def test_in_path(self) -> None:
./Omni/Llamacpp.py-25- """Test that llama.cpp is in $PATH."""
./Omni/Llamacpp.py-26- self.assertIn("llama-cpp", os.environ.get("PATH", ""))
--
./Omni/Bild/Example.py-41-def hello(name: str) -> str:
./Omni/Bild/Example.py-42- """Say hello."""
./Omni/Bild/Example.py-43- return f"Hello {name}"
./Omni/Bild/Example.py-44-
./Omni/Bild/Example.py-45-
./Omni/Bild/Example.py:46:class TestExample(Test.TestCase):
./Omni/Bild/Example.py-47- """Test the Example module."""
./Omni/Bild/Example.py-48-
./Omni/Bild/Example.py-49- def test_hello(self) -> None:
./Omni/Bild/Example.py-50- """Test `hello` function."""
./Omni/Bild/Example.py-51- self.assertEqual("Hello Ben", hello("Ben"))
--
./Omni/Test.py-7-import unittest
./Omni/Test.py-8-
./Omni/Test.py-9-TestCase = unittest.TestCase
./Omni/Test.py-10-
./Omni/Test.py-11-
./Omni/Test.py:12:class TestError(Exception):
./Omni/Test.py-13- """When the test environment or harness encounters a problem."""
./Omni/Test.py-14-
./Omni/Test.py-15-
./Omni/Test.py-16-def run(area: App.Area, tests: list[typing.Any]) -> None:
./Omni/Test.py-17- """Run the given tests with loglevel determined by area."""
--
./Biz/PodcastItLater/Web.py-1917- def tearDown() -> None:
./Biz/PodcastItLater/Web.py-1918- """Clean up test database."""
./Biz/PodcastItLater/Web.py-1919- Core.Database.teardown()
./Biz/PodcastItLater/Web.py-1920-
./Biz/PodcastItLater/Web.py-1921-
./Biz/PodcastItLater/Web.py:1922:class TestDurationFormatting(Test.TestCase):
./Biz/PodcastItLater/Web.py-1923- """Test duration formatting functionality."""
./Biz/PodcastItLater/Web.py-1924-
./Biz/PodcastItLater/Web.py-1925- def test_format_duration_minutes_only(self) -> None:
./Biz/PodcastItLater/Web.py-1926- """Test formatting durations less than an hour."""
./Biz/PodcastItLater/Web.py-1927- self.assertEqual(UI.format_duration(60), "1m")
--
./Biz/PodcastItLater/Web.py-1948- self.assertEqual(UI.format_duration(None), "Unknown")
./Biz/PodcastItLater/Web.py-1949- self.assertEqual(UI.format_duration(0), "Unknown")
./Biz/PodcastItLater/Web.py-1950- self.assertEqual(UI.format_duration(-100), "Unknown")
./Biz/PodcastItLater/Web.py-1951-
./Biz/PodcastItLater/Web.py-1952-
./Biz/PodcastItLater/Web.py:1953:class TestAuthentication(BaseWebTest):
./Biz/PodcastItLater/Web.py-1954- """Test authentication functionality."""
./Biz/PodcastItLater/Web.py-1955-
./Biz/PodcastItLater/Web.py-1956- def test_login_new_user_active(self) -> None:
./Biz/PodcastItLater/Web.py-1957- """New users should be created with active status."""
./Biz/PodcastItLater/Web.py-1958- response = self.client.post("/login", data={"email": "new@example.com"})
--
./Biz/PodcastItLater/Web.py-2070- )
./Biz/PodcastItLater/Web.py-2071-
./Biz/PodcastItLater/Web.py-2072- self.assertIn("Please login first", response.text)
./Biz/PodcastItLater/Web.py-2073-
./Biz/PodcastItLater/Web.py-2074-
./Biz/PodcastItLater/Web.py:2075:class TestArticleSubmission(BaseWebTest):
./Biz/PodcastItLater/Web.py-2076- """Test article submission functionality."""
./Biz/PodcastItLater/Web.py-2077-
./Biz/PodcastItLater/Web.py-2078- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2079- """Set up test client with logged-in user."""
./Biz/PodcastItLater/Web.py-2080- super().setUp()
--
./Biz/PodcastItLater/Web.py-2146- # Should return HTML fragment, not full page
./Biz/PodcastItLater/Web.py-2147- self.assertNotIn("<!DOCTYPE", response.text)
./Biz/PodcastItLater/Web.py-2148- self.assertIn("<div", response.text)
./Biz/PodcastItLater/Web.py-2149-
./Biz/PodcastItLater/Web.py-2150-
./Biz/PodcastItLater/Web.py:2151:class TestRSSFeed(BaseWebTest):
./Biz/PodcastItLater/Web.py-2152- """Test RSS feed generation."""
./Biz/PodcastItLater/Web.py-2153-
./Biz/PodcastItLater/Web.py-2154- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2155- """Set up test client and create test data."""
./Biz/PodcastItLater/Web.py-2156- super().setUp()
--
./Biz/PodcastItLater/Web.py-2275- self.assertEqual(response_xml.status_code, 200)
./Biz/PodcastItLater/Web.py-2276- self.assertEqual(response_rss.status_code, 200)
./Biz/PodcastItLater/Web.py-2277- self.assertEqual(response_xml.text, response_rss.text)
./Biz/PodcastItLater/Web.py-2278-
./Biz/PodcastItLater/Web.py-2279-
./Biz/PodcastItLater/Web.py:2280:class TestAdminInterface(BaseWebTest):
./Biz/PodcastItLater/Web.py-2281- """Test admin interface functionality."""
./Biz/PodcastItLater/Web.py-2282-
./Biz/PodcastItLater/Web.py-2283- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2284- """Set up test client with logged-in user."""
./Biz/PodcastItLater/Web.py-2285- super().setUp()
--
./Biz/PodcastItLater/Web.py-2386- # Should show status counts
./Biz/PodcastItLater/Web.py-2387- self.assertIn("ERROR: 1", response.text)
./Biz/PodcastItLater/Web.py-2388- self.assertIn("PROCESSING: 1", response.text)
./Biz/PodcastItLater/Web.py-2389-
./Biz/PodcastItLater/Web.py-2390-
./Biz/PodcastItLater/Web.py:2391:class TestMetricsDashboard(BaseWebTest):
./Biz/PodcastItLater/Web.py-2392- """Test metrics dashboard functionality."""
./Biz/PodcastItLater/Web.py-2393-
./Biz/PodcastItLater/Web.py-2394- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2395- """Set up test client with logged-in admin user."""
./Biz/PodcastItLater/Web.py-2396- super().setUp()
--
./Biz/PodcastItLater/Web.py-2535- self.assertIn("Episode Metrics", response.text)
./Biz/PodcastItLater/Web.py-2536- # Should show 0 for counts
./Biz/PodcastItLater/Web.py-2537- self.assertIn("Total Episodes", response.text)
./Biz/PodcastItLater/Web.py-2538-
./Biz/PodcastItLater/Web.py-2539-
./Biz/PodcastItLater/Web.py:2540:class TestJobCancellation(BaseWebTest):
./Biz/PodcastItLater/Web.py-2541- """Test job cancellation functionality."""
./Biz/PodcastItLater/Web.py-2542-
./Biz/PodcastItLater/Web.py-2543- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2544- """Set up test client with logged-in user and pending job."""
./Biz/PodcastItLater/Web.py-2545- super().setUp()
--
./Biz/PodcastItLater/Web.py-2652- f'hx-post="/queue/{processing_job}/cancel"',
./Biz/PodcastItLater/Web.py-2653- response.text,
./Biz/PodcastItLater/Web.py-2654- )
./Biz/PodcastItLater/Web.py-2655-
./Biz/PodcastItLater/Web.py-2656-
./Biz/PodcastItLater/Web.py:2657:class TestEpisodeDetailPage(BaseWebTest):
./Biz/PodcastItLater/Web.py-2658- """Test episode detail page functionality."""
./Biz/PodcastItLater/Web.py-2659-
./Biz/PodcastItLater/Web.py-2660- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2661- """Set up test client with user and episode."""
./Biz/PodcastItLater/Web.py-2662- super().setUp()
--
./Biz/PodcastItLater/Web.py-2753- response.headers["location"],
./Biz/PodcastItLater/Web.py-2754- f"/episode/{self.episode_sqid}",
./Biz/PodcastItLater/Web.py-2755- )
./Biz/PodcastItLater/Web.py-2756-
./Biz/PodcastItLater/Web.py-2757-
./Biz/PodcastItLater/Web.py:2758:class TestPublicFeed(BaseWebTest):
./Biz/PodcastItLater/Web.py-2759- """Test public feed functionality."""
./Biz/PodcastItLater/Web.py-2760-
./Biz/PodcastItLater/Web.py-2761- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2762- """Set up test database, client, and create sample episodes."""
./Biz/PodcastItLater/Web.py-2763- super().setUp()
--
./Biz/PodcastItLater/Web.py-2918- public_episodes = Core.Database.get_public_episodes()
./Biz/PodcastItLater/Web.py-2919- episode_ids = [e["id"] for e in public_episodes]
./Biz/PodcastItLater/Web.py-2920- self.assertIn(user_episode_id, episode_ids)
./Biz/PodcastItLater/Web.py-2921-
./Biz/PodcastItLater/Web.py-2922-
./Biz/PodcastItLater/Web.py:2923:class TestEpisodeDeduplication(BaseWebTest):
./Biz/PodcastItLater/Web.py-2924- """Test episode deduplication functionality."""
./Biz/PodcastItLater/Web.py-2925-
./Biz/PodcastItLater/Web.py-2926- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2927- """Set up test database, client, and create test user."""
./Biz/PodcastItLater/Web.py-2928- super().setUp()
--
./Biz/PodcastItLater/Web.py-2992- episode_ids = [e["id"] for e in episodes]
./Biz/PodcastItLater/Web.py-2993-
./Biz/PodcastItLater/Web.py-2994- self.assertIn(self.episode_id, episode_ids)
./Biz/PodcastItLater/Web.py-2995-
./Biz/PodcastItLater/Web.py-2996-
./Biz/PodcastItLater/Web.py:2997:class TestMetricsTracking(BaseWebTest):
./Biz/PodcastItLater/Web.py-2998- """Test episode metrics tracking."""
./Biz/PodcastItLater/Web.py-2999-
./Biz/PodcastItLater/Web.py-3000- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-3001- """Set up test database, client, and create test episode."""
./Biz/PodcastItLater/Web.py-3002- super().setUp()
--
./Biz/PodcastItLater/Web.py-3072- metrics = Core.Database.get_episode_metric_events(self.episode_id)
./Biz/PodcastItLater/Web.py-3073- played_metrics = [m for m in metrics if m["event_type"] == "played"]
./Biz/PodcastItLater/Web.py-3074- self.assertGreater(len(played_metrics), 0)
./Biz/PodcastItLater/Web.py-3075-
./Biz/PodcastItLater/Web.py-3076-
./Biz/PodcastItLater/Web.py:3077:class TestUsageLimits(BaseWebTest):
./Biz/PodcastItLater/Web.py-3078- """Test usage tracking and limit enforcement."""
./Biz/PodcastItLater/Web.py-3079-
./Biz/PodcastItLater/Web.py-3080- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-3081- """Set up test with free tier user."""
./Biz/PodcastItLater/Web.py-3082- super().setUp()
--
./Biz/PodcastItLater/Web.py-3250- self.assertTrue(allowed)
./Biz/PodcastItLater/Web.py-3251- self.assertEqual(msg, "")
./Biz/PodcastItLater/Web.py-3252- self.assertEqual(usage["articles"], 20)
./Biz/PodcastItLater/Web.py-3253-
./Biz/PodcastItLater/Web.py-3254-
./Biz/PodcastItLater/Web.py:3255:class TestAccountPage(BaseWebTest):
./Biz/PodcastItLater/Web.py-3256- """Test account page functionality."""
./Biz/PodcastItLater/Web.py-3257-
./Biz/PodcastItLater/Web.py-3258- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-3259- """Set up test with user."""
./Biz/PodcastItLater/Web.py-3260- super().setUp()
--
./Biz/PodcastItLater/Web.py-3367- # Verify session cleared
./Biz/PodcastItLater/Web.py-3368- response = self.client.get("/account", follow_redirects=False)
./Biz/PodcastItLater/Web.py-3369- self.assertEqual(response.status_code, 307)
./Biz/PodcastItLater/Web.py-3370-
./Biz/PodcastItLater/Web.py-3371-
./Biz/PodcastItLater/Web.py:3372:class TestAdminUsers(BaseWebTest):
./Biz/PodcastItLater/Web.py-3373- """Test admin user management functionality."""
./Biz/PodcastItLater/Web.py-3374-
./Biz/PodcastItLater/Web.py-3375- def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-3376- """Set up test client with logged-in admin user."""
./Biz/PodcastItLater/Web.py-3377- super().setUp()
--
./Biz/PodcastItLater/Test.py-47- def tearDown() -> None:
./Biz/PodcastItLater/Test.py-48- """Clean up after each test."""
./Biz/PodcastItLater/Test.py-49- Core.Database.teardown()
./Biz/PodcastItLater/Test.py-50-
./Biz/PodcastItLater/Test.py-51-
./Biz/PodcastItLater/Test.py:52:class TestEndToEnd(BaseWebTest):
./Biz/PodcastItLater/Test.py-53- """Test complete end-to-end flows."""
./Biz/PodcastItLater/Test.py-54-
./Biz/PodcastItLater/Test.py-55- def setUp(self) -> None:
./Biz/PodcastItLater/Test.py-56- """Set up test client with logged-in user."""
./Biz/PodcastItLater/Test.py-57- super().setUp()
--
./Biz/PodcastItLater/Test.py-208- response = self.client.get(f"/feed/{token2}.rss")
./Biz/PodcastItLater/Test.py-209- self.assertNotIn("Great Article Title", response.text)
./Biz/PodcastItLater/Test.py-210- self.assertIn("Other User's Article", response.text)
./Biz/PodcastItLater/Test.py-211-
./Biz/PodcastItLater/Test.py-212-
./Biz/PodcastItLater/Test.py:213:class TestUI(Test.TestCase):
./Biz/PodcastItLater/Test.py-214- """Test UI components."""
./Biz/PodcastItLater/Test.py-215-
./Biz/PodcastItLater/Test.py-216- def test_render_navbar(self) -> None:
./Biz/PodcastItLater/Test.py-217- """Test navbar rendering."""
./Biz/PodcastItLater/Test.py-218- user = {"email": "test@example.com", "id": 1}
--
./Biz/PodcastItLater.md-189-# Main code implementation
./Biz/PodcastItLater.md-190-class Database:
./Biz/PodcastItLater.md-191- ...
./Biz/PodcastItLater.md-192-
./Biz/PodcastItLater.md-193-# Test class next to the class it is testing
./Biz/PodcastItLater.md:194:class TestDatabase(Test.TestCase):
./Biz/PodcastItLater.md-195- """Test the Database class."""
./Biz/PodcastItLater.md-196-
./Biz/PodcastItLater.md-197- def test_init_db(self) -> None:
./Biz/PodcastItLater.md-198- """Verify all tables and indexes are created correctly."""
./Biz/PodcastItLater.md-199- ...
--
./Biz/PodcastItLater/TestMetricsView.py-29- def tearDown() -> None:
./Biz/PodcastItLater/TestMetricsView.py-30- """Clean up test database."""
./Biz/PodcastItLater/TestMetricsView.py-31- Core.Database.teardown()
./Biz/PodcastItLater/TestMetricsView.py-32-
./Biz/PodcastItLater/TestMetricsView.py-33-
./Biz/PodcastItLater/TestMetricsView.py:34:class TestMetricsView(BaseWebTest):
./Biz/PodcastItLater/TestMetricsView.py-35- """Test Admin Metrics View."""
./Biz/PodcastItLater/TestMetricsView.py-36-
./Biz/PodcastItLater/TestMetricsView.py-37- def test_admin_metrics_view_access(self) -> None:
./Biz/PodcastItLater/TestMetricsView.py-38- """Admin user should be able to access metrics view."""
./Biz/PodcastItLater/TestMetricsView.py-39- # Create admin user
--
./Biz/PodcastItLater/Worker.py-1047- except Exception:
./Biz/PodcastItLater/Worker.py-1048- logger.exception("Worker crashed")
./Biz/PodcastItLater/Worker.py-1049- raise
./Biz/PodcastItLater/Worker.py-1050-
./Biz/PodcastItLater/Worker.py-1051-
./Biz/PodcastItLater/Worker.py:1052:class TestArticleExtraction(Test.TestCase):
./Biz/PodcastItLater/Worker.py-1053- """Test article extraction functionality."""
./Biz/PodcastItLater/Worker.py-1054-
./Biz/PodcastItLater/Worker.py-1055- def test_extract_valid_article(self) -> None:
./Biz/PodcastItLater/Worker.py-1056- """Extract from well-formed HTML."""
./Biz/PodcastItLater/Worker.py-1057- # Mock trafilatura.fetch_url and extract
--
./Biz/PodcastItLater/Worker.py-1188- self.assertEqual(content, "Good content")
./Biz/PodcastItLater/Worker.py-1189- self.assertNotIn("script", content)
./Biz/PodcastItLater/Worker.py-1190- self.assertNotIn("table", content)
./Biz/PodcastItLater/Worker.py-1191-
./Biz/PodcastItLater/Worker.py-1192-
./Biz/PodcastItLater/Worker.py:1193:class TestTextToSpeech(Test.TestCase):
./Biz/PodcastItLater/Worker.py-1194- """Test text-to-speech functionality."""
./Biz/PodcastItLater/Worker.py-1195-
./Biz/PodcastItLater/Worker.py-1196- def setUp(self) -> None:
./Biz/PodcastItLater/Worker.py-1197- """Set up mocks."""
./Biz/PodcastItLater/Worker.py-1198- # Mock OpenAI API key
--
./Biz/PodcastItLater/Worker.py-1617- audio_data = processor.text_to_speech("Test content", "Test Title")
./Biz/PodcastItLater/Worker.py-1618-
./Biz/PodcastItLater/Worker.py-1619- self.assertEqual(audio_data, b"ordered-audio")
./Biz/PodcastItLater/Worker.py-1620-
./Biz/PodcastItLater/Worker.py-1621-
./Biz/PodcastItLater/Worker.py:1622:class TestIntroOutro(Test.TestCase):
./Biz/PodcastItLater/Worker.py-1623- """Test intro and outro generation with metadata."""
./Biz/PodcastItLater/Worker.py-1624-
./Biz/PodcastItLater/Worker.py-1625- def test_create_intro_text_full_metadata(self) -> None:
./Biz/PodcastItLater/Worker.py-1626- """Test intro text creation with all metadata."""
./Biz/PodcastItLater/Worker.py-1627- intro = ArticleProcessor._create_intro_text( # noqa: SLF001
--
./Biz/PodcastItLater/Worker.py-1703- self.assertEqual(content, "Article content")
./Biz/PodcastItLater/Worker.py-1704- self.assertEqual(author, "Test Author")
./Biz/PodcastItLater/Worker.py-1705- self.assertEqual(pub_date, "2024-01-15")
./Biz/PodcastItLater/Worker.py-1706-
./Biz/PodcastItLater/Worker.py-1707-
./Biz/PodcastItLater/Worker.py:1708:class TestMemoryEfficiency(Test.TestCase):
./Biz/PodcastItLater/Worker.py-1709- """Test memory-efficient processing."""
./Biz/PodcastItLater/Worker.py-1710-
./Biz/PodcastItLater/Worker.py-1711- def test_large_article_size_limit(self) -> None:
./Biz/PodcastItLater/Worker.py-1712- """Test that articles exceeding size limits are rejected."""
./Biz/PodcastItLater/Worker.py-1713- huge_text = "x" * (MAX_ARTICLE_SIZE + 1000) # Exceed limit
--
./Biz/PodcastItLater/Worker.py-1756- self.assertIsInstance(usage, float)
./Biz/PodcastItLater/Worker.py-1757- self.assertGreaterEqual(usage, 0.0)
./Biz/PodcastItLater/Worker.py-1758- self.assertLessEqual(usage, 100.0)
./Biz/PodcastItLater/Worker.py-1759-
./Biz/PodcastItLater/Worker.py-1760-
./Biz/PodcastItLater/Worker.py:1761:class TestJobProcessing(Test.TestCase):
./Biz/PodcastItLater/Worker.py-1762- """Test job processing functionality."""
./Biz/PodcastItLater/Worker.py-1763-
./Biz/PodcastItLater/Worker.py-1764- def setUp(self) -> None:
./Biz/PodcastItLater/Worker.py-1765- """Set up test environment."""
./Biz/PodcastItLater/Worker.py-1766- Core.Database.init_db()
--
./Biz/PodcastItLater/Worker.py-2063-
./Biz/PodcastItLater/Worker.py-2064- # Job should not be processed (no status updates)
./Biz/PodcastItLater/Worker.py-2065- mock_update.assert_not_called()
./Biz/PodcastItLater/Worker.py-2066-
./Biz/PodcastItLater/Worker.py-2067-
./Biz/PodcastItLater/Worker.py:2068:class TestWorkerErrorHandling(Test.TestCase):
./Biz/PodcastItLater/Worker.py-2069- """Test worker error handling and recovery."""
./Biz/PodcastItLater/Worker.py-2070-
./Biz/PodcastItLater/Worker.py-2071- def setUp(self) -> None:
./Biz/PodcastItLater/Worker.py-2072- """Set up test environment."""
./Biz/PodcastItLater/Worker.py-2073- Core.Database.init_db()
--
./Biz/PodcastItLater/Worker.py-2151- self.assertIsNotNone(job)
./Biz/PodcastItLater/Worker.py-2152- if job:
./Biz/PodcastItLater/Worker.py-2153- self.assertEqual(job["status"], "error")
./Biz/PodcastItLater/Worker.py-2154-
./Biz/PodcastItLater/Worker.py-2155-
./Biz/PodcastItLater/Worker.py:2156:class TestTextChunking(Test.TestCase):
./Biz/PodcastItLater/Worker.py-2157- """Test text chunking edge cases."""
./Biz/PodcastItLater/Worker.py-2158-
./Biz/PodcastItLater/Worker.py-2159- def test_split_text_single_long_word(self) -> None:
./Biz/PodcastItLater/Worker.py-2160- """Handle text with a single word exceeding limit."""
./Biz/PodcastItLater/Worker.py-2161- long_word = "a" * 4000
--
./Biz/PodcastItLater/Billing.py-472-
./Biz/PodcastItLater/Billing.py-473-# Tests
./Biz/PodcastItLater/Billing.py-474-# ruff: noqa: PLR6301, PLW0603, S101
./Biz/PodcastItLater/Billing.py-475-
./Biz/PodcastItLater/Billing.py-476-
./Biz/PodcastItLater/Billing.py:477:class TestWebhookHandling(Test.TestCase):
./Biz/PodcastItLater/Billing.py-478- """Test Stripe webhook handling."""
./Biz/PodcastItLater/Billing.py-479-
./Biz/PodcastItLater/Billing.py-480- def setUp(self) -> None:
./Biz/PodcastItLater/Billing.py-481- """Set up test database."""
./Biz/PodcastItLater/Billing.py-482- Core.Database.init_db()
--
./Biz/PodcastItLater/Core.py-1544- minutes = total_seconds // 60
./Biz/PodcastItLater/Core.py-1545-
./Biz/PodcastItLater/Core.py-1546- return {"articles": articles, "minutes": minutes}
./Biz/PodcastItLater/Core.py-1547-
./Biz/PodcastItLater/Core.py-1548-
./Biz/PodcastItLater/Core.py:1549:class TestDatabase(Test.TestCase):
./Biz/PodcastItLater/Core.py-1550- """Test the Database class."""
./Biz/PodcastItLater/Core.py-1551-
./Biz/PodcastItLater/Core.py-1552- @staticmethod
./Biz/PodcastItLater/Core.py-1553- def setUp() -> None:
./Biz/PodcastItLater/Core.py-1554- """Set up test database."""
--
./Biz/PodcastItLater/Core.py-1638- self.assertEqual(metrics["total_users"], 1)
./Biz/PodcastItLater/Core.py-1639- self.assertEqual(metrics["submissions_24h"], 1)
./Biz/PodcastItLater/Core.py-1640- self.assertEqual(metrics["submissions_7d"], 1)
./Biz/PodcastItLater/Core.py-1641-
./Biz/PodcastItLater/Core.py-1642-
./Biz/PodcastItLater/Core.py:1643:class TestUserManagement(Test.TestCase):
./Biz/PodcastItLater/Core.py-1644- """Test user management functionality."""
./Biz/PodcastItLater/Core.py-1645-
./Biz/PodcastItLater/Core.py-1646- @staticmethod
./Biz/PodcastItLater/Core.py-1647- def setUp() -> None:
./Biz/PodcastItLater/Core.py-1648- """Set up test database."""
--
./Biz/PodcastItLater/Core.py-1795- # Try to update user1 to user2's email
./Biz/PodcastItLater/Core.py-1796- with pytest.raises(ValueError, match="already taken"):
./Biz/PodcastItLater/Core.py-1797- Database.update_user_email(user_id1, "user2@example.com")
./Biz/PodcastItLater/Core.py-1798-
./Biz/PodcastItLater/Core.py-1799-
./Biz/PodcastItLater/Core.py:1800:class TestQueueOperations(Test.TestCase):
./Biz/PodcastItLater/Core.py-1801- """Test queue operations."""
./Biz/PodcastItLater/Core.py-1802-
./Biz/PodcastItLater/Core.py-1803- def setUp(self) -> None:
./Biz/PodcastItLater/Core.py-1804- """Set up test database with a user."""
./Biz/PodcastItLater/Core.py-1805- Database.init_db()
--
./Biz/PodcastItLater/Core.py-2041- self.assertEqual(Database.get_queue_position(job1), 1)
./Biz/PodcastItLater/Core.py-2042- self.assertIsNone(Database.get_queue_position(job2))
./Biz/PodcastItLater/Core.py-2043- self.assertEqual(Database.get_queue_position(job3), 2)
./Biz/PodcastItLater/Core.py-2044-
./Biz/PodcastItLater/Core.py-2045-
./Biz/PodcastItLater/Core.py:2046:class TestEpisodeManagement(Test.TestCase):
./Biz/PodcastItLater/Core.py-2047- """Test episode management functionality."""
./Biz/PodcastItLater/Core.py-2048-
./Biz/PodcastItLater/Core.py-2049- def setUp(self) -> None:
./Biz/PodcastItLater/Core.py-2050- """Set up test database with a user."""
./Biz/PodcastItLater/Core.py-2051- Database.init_db()
Now let me check the Worker.py file to see the test classes and understand where they are:
^class Test in Biz/PodcastItLater/Worker.py1052:class TestArticleExtraction(Test.TestCase): 1193:class TestTextToSpeech(Test.TestCase): 1622:class TestIntroOutro(Test.TestCase): 1708:class TestMemoryEfficiency(Test.TestCase): 1761:class TestJobProcessing(Test.TestCase): 2068:class TestWorkerErrorHandling(Test.TestCase): 2156:class TestTextChunking(Test.TestCase):
Let me get a full overview of the file structure:
^(class |def |# ) in Biz/PodcastItLater/Worker.py3:# : dep boto3 4:# : dep botocore 5:# : dep openai 6:# : dep psutil 7:# : dep pydub 8:# : dep pytest 9:# : dep pytest-asyncio 10:# : dep pytest-mock 11:# : dep trafilatura 12:# : out podcastitlater-worker 13:# : run ffmpeg 47:# Configuration from environment variables 55:# Worker configuration 67:class ShutdownHandler: 103:class ArticleProcessor: 721:def prepare_text_for_tts(text: str, title: str) -> list[str]: 761:def split_text_into_chunks(text: str, max_chars: int = 3000) -> list[str]: 799:def edit_chunk_for_speech( 874:def parse_datetime_with_timezone(created_at: str | datetime) -> datetime: 890:def should_retry_job(job: dict[str, Any], max_retries: int) -> bool: 907:def process_pending_jobs( 939:def process_retryable_jobs() -> None: 958:def check_memory_usage() -> int | Any: 969:def cleanup_stale_jobs() -> None: 990:def main_loop() -> None: 1036:def move() -> None: 1052:class TestArticleExtraction(Test.TestCase): 1193:class TestTextToSpeech(Test.TestCase): 1622:class TestIntroOutro(Test.TestCase): 1708:class TestMemoryEfficiency(Test.TestCase): 1761:class TestJobProcessing(Test.TestCase): 2068:class TestWorkerErrorHandling(Test.TestCase): 2156:class TestTextChunking(Test.TestCase): 2179:def test() -> None: 2194:def main() -> None:
Let me check the total lines and get a better picture:
wc -l Biz/PodcastItLater/Worker.pyGood. Now let me read the key sections to understand the structure better:
Biz/PodcastItLater/Worker.py @103-140103: class ArticleProcessor:
104: """Handles the complete article-to-podcast conversion pipeline."""
105:
106: def __init__(self, shutdown_handler: ShutdownHandler) -> None:
107: """Initialize the processor with required services.
108:
109: Raises:
110: ValueError: If OPENAI_API_KEY environment variable is not set.
111: """
112: if not OPENAI_API_KEY:
113: msg = "OPENAI_API_KEY environment variable is required"
114: raise ValueError(msg)
115:
116: self.openai_client: openai.OpenAI = openai.OpenAI(
117: api_key=OPENAI_API_KEY,
118: )
119: self.shutdown_handler = shutdown_handler
120:
121: # Initialize S3 client for Digital Ocean Spaces
122: if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]):
123: self.s3_client: Any = boto3.client(
124: "s3",
125: endpoint_url=S3_ENDPOINT,
126: aws_access_key_id=S3_ACCESS_KEY,
127: aws_secret_access_key=S3_SECRET_KEY,
128: )
129: else:
130: logger.warning("S3 configuration incomplete, uploads will fail")
131: self.s3_client = None
132:
133: @staticmethod
134: def extract_article_content(
135: url: str,
136: ) -> tuple[str, str, str | None, str | None]:
137: """Extract title, content, author, and date from article URL.
138:
139: Returns:
140: tuple: (title, content, author, publication_date)
Biz/PodcastItLater/Worker.py @715-730715: raise
716: finally:
717: # Clear current job
718: self.shutdown_handler.set_current_job(None)
719:
720:
721: def prepare_text_for_tts(text: str, title: str) -> list[str]:
722: """Use LLM to prepare text for TTS, returning chunks ready for speech.
723:
724: First splits text mechanically, then has LLM edit each chunk.
725: """
726: # First, split the text into manageable chunks
727: raw_chunks = split_text_into_chunks(text, max_chars=3000)
728:
729: logger.info("Split article into %d raw chunks", len(raw_chunks))
730:
Let me read the functions that need to be moved:
Biz/PodcastItLater/Worker.py @721-873721: def prepare_text_for_tts(text: str, title: str) -> list[str]:
722: """Use LLM to prepare text for TTS, returning chunks ready for speech.
723:
724: First splits text mechanically, then has LLM edit each chunk.
725: """
726: # First, split the text into manageable chunks
727: raw_chunks = split_text_into_chunks(text, max_chars=3000)
728:
729: logger.info("Split article into %d raw chunks", len(raw_chunks))
730:
731: # Prepare the first chunk with intro
732: edited_chunks = []
733:
734: for i, chunk in enumerate(raw_chunks):
735: is_first = i == 0
736: is_last = i == len(raw_chunks) - 1
737:
738: try:
739: edited_chunk = edit_chunk_for_speech(
740: chunk,
741: title=title if is_first else None,
742: is_first=is_first,
743: is_last=is_last,
744: )
745: edited_chunks.append(edited_chunk)
746: except Exception:
747: logger.exception("Failed to edit chunk %d", i + 1)
748: # Fall back to raw chunk if LLM fails
749: if is_first:
750: edited_chunks.append(
751: f"This is an audio version of {title}. {chunk}",
752: )
753: elif is_last:
754: edited_chunks.append(f"{chunk} This concludes the article.")
755: else:
756: edited_chunks.append(chunk)
757:
758: return edited_chunks
759:
760:
761: def split_text_into_chunks(text: str, max_chars: int = 3000) -> list[str]:
762: """Split text into chunks at sentence boundaries."""
763: chunks = []
764: current_chunk = ""
765:
766: # Split into paragraphs first
767: paragraphs = text.split("\n\n")
768:
769: for para in paragraphs:
770: para_stripped = para.strip()
771: if not para_stripped:
772: continue
773:
774: # If paragraph itself is too long, split by sentences
775: if len(para_stripped) > max_chars:
776: sentences = para_stripped.split(". ")
777: for sentence in sentences:
778: if len(current_chunk) + len(sentence) + 2 < max_chars:
779: current_chunk += sentence + ". "
780: else:
781: if current_chunk:
782: chunks.append(current_chunk.strip())
783: current_chunk = sentence + ". "
784: # If adding this paragraph would exceed limit, start new chunk
785: elif len(current_chunk) + len(para_stripped) + 2 > max_chars:
786: if current_chunk:
787: chunks.append(current_chunk.strip())
788: current_chunk = para_stripped + " "
789: else:
790: current_chunk += para_stripped + " "
791:
792: # Don't forget the last chunk
793: if current_chunk:
794: chunks.append(current_chunk.strip())
795:
796: return chunks
797:
798:
799: def edit_chunk_for_speech(
800: chunk: str,
801: title: str | None = None,
802: *,
803: is_first: bool = False,
804: is_last: bool = False,
805: ) -> str:
806: """Use LLM to lightly edit a single chunk for speech.
807:
808: Raises:
809: ValueError: If no content is returned from LLM.
810: """
811: system_prompt = (
812: "You are a podcast script editor. Your job is to lightly edit text "
813: "to make it sound natural when spoken aloud.\n\n"
814: "Guidelines:\n"
815: )
816: system_prompt += """
817: - Remove URLs and email addresses, replacing with descriptive phrases
818: - Convert bullet points and lists into flowing sentences
819: - Fix any awkward phrasing for speech
820: - Remove references like "click here" or "see below"
821: - Keep edits minimal - preserve the original content and style
822: - Do NOT add commentary or explanations
823: - Return ONLY the edited text, no JSON or formatting
824: """
825:
826: user_prompt = chunk
827:
828: # Add intro/outro if needed
829: if is_first and title:
830: user_prompt = (
831: f"Add a brief intro mentioning this is an audio version of "
832: f"'{title}', then edit this text:\n\n{chunk}"
833: )
834: elif is_last:
835: user_prompt = f"Edit this text and add a brief closing:\n\n{chunk}"
836:
837: try:
838: client: openai.OpenAI = openai.OpenAI(api_key=OPENAI_API_KEY)
839: response = client.chat.completions.create(
840: model="gpt-4o-mini",
841: messages=[
842: {"role": "system", "content": system_prompt},
843: {"role": "user", "content": user_prompt},
844: ],
845: temperature=0.3, # Lower temperature for more consistent edits
846: max_tokens=4000,
847: )
848:
849: content = response.choices[0].message.content
850: if not content:
851: msg = "No content returned from LLM"
852: raise ValueError(msg) # noqa: TRY301
853:
854: # Ensure the chunk isn't too long
855: max_chunk_length = 4000
856: if len(content) > max_chunk_length:
857: # Truncate at sentence boundary
858: sentences = content.split(". ")
859: truncated = ""
860: for sentence in sentences:
861: if len(truncated) + len(sentence) + 2 < max_chunk_length:
862: truncated += sentence + ". "
863: else:
864: break
865: content = truncated.strip()
866:
867: except Exception:
868: logger.exception("LLM chunk editing failed")
869: raise
870: else:
871: return content
872:
873:
Biz/PodcastItLater/Worker.py @874-1035874: def parse_datetime_with_timezone(created_at: str | datetime) -> datetime:
875: """Parse datetime string and ensure it has timezone info."""
876: if isinstance(created_at, str):
877: # Handle timezone-aware datetime strings
878: if created_at.endswith("Z"):
879: created_at = created_at[:-1] + "+00:00"
880: last_attempt = datetime.fromisoformat(created_at)
881: if last_attempt.tzinfo is None:
882: last_attempt = last_attempt.replace(tzinfo=timezone.utc)
883: else:
884: last_attempt = created_at
885: if last_attempt.tzinfo is None:
886: last_attempt = last_attempt.replace(tzinfo=timezone.utc)
887: return last_attempt
888:
889:
890: def should_retry_job(job: dict[str, Any], max_retries: int) -> bool:
891: """Check if a job should be retried based on retry count and backoff time.
892:
893: Uses exponential backoff to determine if enough time has passed.
894: """
895: retry_count = job["retry_count"]
896: if retry_count >= max_retries:
897: return False
898:
899: # Exponential backoff: 30s, 60s, 120s
900: backoff_time = 30 * (2**retry_count)
901: last_attempt = parse_datetime_with_timezone(job["created_at"])
902: time_since_attempt = datetime.now(tz=timezone.utc) - last_attempt
903:
904: return time_since_attempt > timedelta(seconds=backoff_time)
905:
906:
907: def process_pending_jobs(
908: processor: ArticleProcessor,
909: ) -> None:
910: """Process all pending jobs."""
911: pending_jobs = Core.Database.get_pending_jobs(
912: limit=5,
913: )
914:
915: for job in pending_jobs:
916: if processor.shutdown_handler.is_shutdown_requested():
917: logger.info("Shutdown requested, stopping job processing")
918: break
919:
920: current_job = job["id"]
921: try:
922: processor.process_job(job)
923: except Exception as e:
924: # Ensure job is marked as error even if process_job didn't handle it
925: logger.exception("Failed to process job: %d", current_job)
926: # Check if job is still in processing state
927: current_status = Core.Database.get_job_by_id(
928: current_job,
929: )
930: if current_status and current_status.get("status") == "processing":
931: Core.Database.update_job_status(
932: current_job,
933: "error",
934: str(e),
935: )
936: continue
937:
938:
939: def process_retryable_jobs() -> None:
940: """Check and retry failed jobs with exponential backoff."""
941: retryable_jobs = Core.Database.get_retryable_jobs(
942: MAX_RETRIES,
943: )
944:
945: for job in retryable_jobs:
946: if should_retry_job(job, MAX_RETRIES):
947: logger.info(
948: "Retrying job %d (attempt %d)",
949: job["id"],
950: job["retry_count"] + 1,
951: )
952: Core.Database.update_job_status(
953: job["id"],
954: "pending",
955: )
956:
957:
958: def check_memory_usage() -> int | Any:
959: """Check current memory usage percentage."""
960: try:
961: process = psutil.Process()
962: # this returns an int but psutil is untyped
963: return process.memory_percent()
964: except (psutil.Error, OSError):
965: logger.warning("Failed to check memory usage")
966: return 0.0
967:
968:
969: def cleanup_stale_jobs() -> None:
970: """Reset jobs stuck in processing state on startup."""
971: with Core.Database.get_connection() as conn:
972: cursor = conn.cursor()
973: cursor.execute(
974: """
975: UPDATE queue
976: SET status = 'pending'
977: WHERE status = 'processing'
978: """,
979: )
980: affected = cursor.rowcount
981: conn.commit()
982:
983: if affected > 0:
984: logger.info(
985: "Reset %d stale jobs from processing to pending",
986: affected,
987: )
988:
989:
990: def main_loop() -> None:
991: """Poll for jobs and process them in a continuous loop."""
992: shutdown_handler = ShutdownHandler()
993: processor = ArticleProcessor(shutdown_handler)
994:
995: # Clean up any stale jobs from previous runs
996: cleanup_stale_jobs()
997:
998: logger.info("Worker started, polling for jobs...")
999:
1000: while not shutdown_handler.is_shutdown_requested():
1001: try:
1002: # Process pending jobs
1003: process_pending_jobs(processor)
1004: process_retryable_jobs()
1005:
1006: # Check if there's any work
1007: pending_jobs = Core.Database.get_pending_jobs(
1008: limit=1,
1009: )
1010: retryable_jobs = Core.Database.get_retryable_jobs(
1011: MAX_RETRIES,
1012: )
1013:
1014: if not pending_jobs and not retryable_jobs:
1015: logger.debug("No jobs to process, sleeping...")
1016:
1017: except Exception:
1018: logger.exception("Error in main loop")
1019:
1020: # Use interruptible sleep
1021: if not shutdown_handler.is_shutdown_requested():
1022: shutdown_handler.shutdown_requested.wait(timeout=POLL_INTERVAL)
1023:
1024: # Graceful shutdown
1025: current_job = shutdown_handler.get_current_job()
1026: if current_job:
1027: logger.info(
1028: "Waiting for job %d to complete before shutdown...",
1029: current_job,
1030: )
1031: # The job will complete or be reset to pending
1032:
1033: logger.info("Worker shutdown complete")
1034:
1035:
Biz/PodcastItLater/Worker.py @1036-10601036: def move() -> None:
1037: """Make the worker move."""
1038: try:
1039: # Initialize database
1040: Core.Database.init_db()
1041:
1042: # Start main processing loop
1043: main_loop()
1044:
1045: except KeyboardInterrupt:
1046: logger.info("Worker stopped by user")
1047: except Exception:
1048: logger.exception("Worker crashed")
1049: raise
1050:
1051:
1052: class TestArticleExtraction(Test.TestCase):
1053: """Test article extraction functionality."""
1054:
1055: def test_extract_valid_article(self) -> None:
1056: """Extract from well-formed HTML."""
1057: # Mock trafilatura.fetch_url and extract
1058: mock_html = (
1059: "<html><body><h1>Test Article</h1><p>Content here</p></body></html>"
1060: )
Biz/PodcastItLater/Worker.py @2179-21992179: def test() -> None: 2180: """Run the tests.""" 2181: Test.run( 2182: App.Area.Test, 2183: [ 2184: TestArticleExtraction, 2185: TestTextToSpeech, 2186: TestMemoryEfficiency, 2187: TestJobProcessing, 2188: TestWorkerErrorHandling, 2189: TestTextChunking, 2190: ], 2191: ) 2192: 2193: 2194: def main() -> None: 2195: """Entry point for the worker.""" 2196: if "test" in sys.argv: 2197: test() 2198: else: 2199: move()
Now let me check where ArticleProcessor class ends:
Biz/PodcastItLater/Worker.py @700-725700:
701: logger.info(
702: "Successfully processed job %d -> episode %d",
703: job_id,
704: episode_id,
705: )
706:
707: except Exception as e:
708: error_msg = str(e)
709: logger.exception("Job %d failed: %s", job_id, error_msg)
710: Core.Database.update_job_status(
711: job_id,
712: "error",
713: error_msg,
714: )
715: raise
716: finally:
717: # Clear current job
718: self.shutdown_handler.set_current_job(None)
719:
720:
721: def prepare_text_for_tts(text: str, title: str) -> list[str]:
722: """Use LLM to prepare text for TTS, returning chunks ready for speech.
723:
724: First splits text mechanically, then has LLM edit each chunk.
725: """
Now I have enough context. Let me check the test classes to understand which tests go with which functionality:
Biz/PodcastItLater/Worker.py @1052-11951052: class TestArticleExtraction(Test.TestCase):
1053: """Test article extraction functionality."""
1054:
1055: def test_extract_valid_article(self) -> None:
1056: """Extract from well-formed HTML."""
1057: # Mock trafilatura.fetch_url and extract
1058: mock_html = (
1059: "<html><body><h1>Test Article</h1><p>Content here</p></body></html>"
1060: )
1061: mock_result = json.dumps({
1062: "title": "Test Article",
1063: "text": "Content here",
1064: })
1065:
1066: with (
1067: unittest.mock.patch(
1068: "trafilatura.fetch_url",
1069: return_value=mock_html,
1070: ),
1071: unittest.mock.patch(
1072: "trafilatura.extract",
1073: return_value=mock_result,
1074: ),
1075: ):
1076: title, content, author, pub_date = (
1077: ArticleProcessor.extract_article_content(
1078: "https://example.com",
1079: )
1080: )
1081:
1082: self.assertEqual(title, "Test Article")
1083: self.assertEqual(content, "Content here")
1084: self.assertIsNone(author)
1085: self.assertIsNone(pub_date)
1086:
1087: def test_extract_missing_title(self) -> None:
1088: """Handle articles without titles."""
1089: mock_html = "<html><body><p>Content without title</p></body></html>"
1090: mock_result = json.dumps({"text": "Content without title"})
1091:
1092: with (
1093: unittest.mock.patch(
1094: "trafilatura.fetch_url",
1095: return_value=mock_html,
1096: ),
1097: unittest.mock.patch(
1098: "trafilatura.extract",
1099: return_value=mock_result,
1100: ),
1101: ):
1102: title, content, author, pub_date = (
1103: ArticleProcessor.extract_article_content(
1104: "https://example.com",
1105: )
1106: )
1107:
1108: self.assertEqual(title, "Untitled Article")
1109: self.assertEqual(content, "Content without title")
1110: self.assertIsNone(author)
1111: self.assertIsNone(pub_date)
1112:
1113: def test_extract_empty_content(self) -> None:
1114: """Handle empty articles."""
1115: mock_html = "<html><body></body></html>"
1116: mock_result = json.dumps({"title": "Empty Article", "text": ""})
1117:
1118: with (
1119: unittest.mock.patch(
1120: "trafilatura.fetch_url",
1121: return_value=mock_html,
1122: ),
1123: unittest.mock.patch(
1124: "trafilatura.extract",
1125: return_value=mock_result,
1126: ),
1127: pytest.raises(ValueError, match="No content extracted") as cm,
1128: ):
1129: ArticleProcessor.extract_article_content(
1130: "https://example.com",
1131: )
1132:
1133: self.assertIn("No content extracted", str(cm.value))
1134:
1135: def test_extract_network_error(self) -> None:
1136: """Handle connection failures."""
1137: with (
1138: unittest.mock.patch("trafilatura.fetch_url", return_value=None),
1139: pytest.raises(ValueError, match="Failed to download") as cm,
1140: ):
1141: ArticleProcessor.extract_article_content("https://example.com")
1142:
1143: self.assertIn("Failed to download", str(cm.value))
1144:
1145: @staticmethod
1146: def test_extract_timeout() -> None:
1147: """Handle slow responses."""
1148: with (
1149: unittest.mock.patch(
1150: "trafilatura.fetch_url",
1151: side_effect=TimeoutError("Timeout"),
1152: ),
1153: pytest.raises(TimeoutError),
1154: ):
1155: ArticleProcessor.extract_article_content("https://example.com")
1156:
1157: def test_content_sanitization(self) -> None:
1158: """Remove unwanted elements."""
1159: mock_html = """
1160: <html><body>
1161: <h1>Article</h1>
1162: <p>Good content</p>
1163: <script>alert('bad')</script>
1164: <table><tr><td>data</td></tr></table>
1165: </body></html>
1166: """
1167: mock_result = json.dumps({
1168: "title": "Article",
1169: "text": "Good content", # Tables and scripts removed
1170: })
1171:
1172: with (
1173: unittest.mock.patch(
1174: "trafilatura.fetch_url",
1175: return_value=mock_html,
1176: ),
1177: unittest.mock.patch(
1178: "trafilatura.extract",
1179: return_value=mock_result,
1180: ),
1181: ):
1182: _title, content, _author, _pub_date = (
1183: ArticleProcessor.extract_article_content(
1184: "https://example.com",
1185: )
1186: )
1187:
1188: self.assertEqual(content, "Good content")
1189: self.assertNotIn("script", content)
1190: self.assertNotIn("table", content)
1191:
1192:
1193: class TestTextToSpeech(Test.TestCase):
1194: """Test text-to-speech functionality."""
1195:
Let me check line count for the tests to understand better what to move:
grep -n "^class Test" Biz/PodcastItLater/Worker.py1052:class TestArticleExtraction(Test.TestCase): 1193:class TestTextToSpeech(Test.TestCase): 1622:class TestIntroOutro(Test.TestCase): 1708:class TestMemoryEfficiency(Test.TestCase): 1761:class TestJobProcessing(Test.TestCase): 2068:class TestWorkerErrorHandling(Test.TestCase): 2156:class TestTextChunking(Test.TestCase):
Now let me understand the plan from the task:
Based on the task description, I need to:
1. Create Worker/Processor.py with ArticleProcessor class + TestArticleExtraction, TestTextToSpeech, TestIntroOutro
2. Create Worker/TextProcessing.py with text processing functions + TestTextChunking
3. Create Worker/Jobs.py with job management functions + TestJobProcessing
4. Keep in Worker.py: imports, config, ShutdownHandler, move(), test(), main(), TestMemoryEfficiency, TestWorkerErrorHandling
Let me first create the directory structure:
mkdir -p Biz/PodcastItLater/WorkerNow let me read the ArticleProcessor class completely to extract it properly:
Biz/PodcastItLater/Worker.py @103-720103: class ArticleProcessor:
104: """Handles the complete article-to-podcast conversion pipeline."""
105:
106: def __init__(self, shutdown_handler: ShutdownHandler) -> None:
107: """Initialize the processor with required services.
108:
109: Raises:
110: ValueError: If OPENAI_API_KEY environment variable is not set.
111: """
112: if not OPENAI_API_KEY:
113: msg = "OPENAI_API_KEY environment variable is required"
114: raise ValueError(msg)
115:
116: self.openai_client: openai.OpenAI = openai.OpenAI(
117: api_key=OPENAI_API_KEY,
118: )
119: self.shutdown_handler = shutdown_handler
120:
121: # Initialize S3 client for Digital Ocean Spaces
122: if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]):
123: self.s3_client: Any = boto3.client(
124: "s3",
125: endpoint_url=S3_ENDPOINT,
126: aws_access_key_id=S3_ACCESS_KEY,
127: aws_secret_access_key=S3_SECRET_KEY,
128: )
129: else:
130: logger.warning("S3 configuration incomplete, uploads will fail")
131: self.s3_client = None
132:
133: @staticmethod
134: def extract_article_content(
135: url: str,
136: ) -> tuple[str, str, str | None, str | None]:
137: """Extract title, content, author, and date from article URL.
138:
139: Returns:
140: tuple: (title, content, author, publication_date)
141:
142: Raises:
143: ValueError: If content cannot be downloaded, extracted, or large.
144: """
145: try:
146: downloaded = trafilatura.fetch_url(url)
147: if not downloaded:
148: msg = f"Failed to download content from {url}"
149: raise ValueError(msg) # noqa: TRY301
150:
151: # Check size before processing
152: if (
153: len(downloaded) > MAX_ARTICLE_SIZE * 4
154: ): # Rough HTML to text ratio
155: msg = f"Article too large: {len(downloaded)} bytes"
156: raise ValueError(msg) # noqa: TRY301
157:
158: # Extract with metadata
159: result = trafilatura.extract(
160: downloaded,
161: include_comments=False,
162: include_tables=False,
163: with_metadata=True,
164: output_format="json",
165: )
166:
167: if not result:
168: msg = f"Failed to extract content from {url}"
169: raise ValueError(msg) # noqa: TRY301
170:
171: data = json.loads(result)
172:
173: title = data.get("title", "Untitled Article")
174: content = data.get("text", "")
175: author = data.get("author")
176: pub_date = data.get("date")
177:
178: if not content:
179: msg = f"No content extracted from {url}"
180: raise ValueError(msg) # noqa: TRY301
181:
182: # Enforce content size limit
183: if len(content) > MAX_ARTICLE_SIZE:
184: logger.warning(
185: "Article content truncated from %d to %d characters",
186: len(content),
187: MAX_ARTICLE_SIZE,
188: )
189: content = content[:MAX_ARTICLE_SIZE]
190:
191: logger.info(
192: "Extracted article: %s (%d chars, author: %s, date: %s)",
193: title,
194: len(content),
195: author or "unknown",
196: pub_date or "unknown",
197: )
198: except Exception:
199: logger.exception("Failed to extract content from %s", url)
200: raise
201: else:
202: return title, content, author, pub_date
203:
204: def text_to_speech(
205: self,
206: text: str,
207: title: str,
208: author: str | None = None,
209: pub_date: str | None = None,
210: ) -> bytes:
211: """Convert text to speech with intro/outro using OpenAI TTS API.
212:
213: Uses parallel processing for chunks while maintaining order.
214: Adds intro with metadata and outro with attribution.
215:
216: Args:
217: text: Article content to convert
218: title: Article title
219: author: Article author (optional)
220: pub_date: Publication date (optional)
221:
222: Raises:
223: ValueError: If no chunks are generated from text.
224: """
225: try:
226: # Generate intro audio
227: intro_text = self._create_intro_text(title, author, pub_date)
228: intro_audio = self._generate_tts_segment(intro_text)
229:
230: # Generate outro audio
231: outro_text = self._create_outro_text(title, author)
232: outro_audio = self._generate_tts_segment(outro_text)
233:
234: # Use LLM to prepare and chunk the main content
235: chunks = prepare_text_for_tts(text, title)
236:
237: if not chunks:
238: msg = "No chunks generated from text"
239: raise ValueError(msg) # noqa: TRY301
240:
241: logger.info("Processing %d chunks for TTS", len(chunks))
242:
243: # Check memory before parallel processing
244: mem_usage = check_memory_usage()
245: if mem_usage > MEMORY_THRESHOLD - 20: # Leave 20% buffer
246: logger.warning(
247: "High memory usage (%.1f%%), falling back to serial "
248: "processing",
249: mem_usage,
250: )
251: content_audio_bytes = self._text_to_speech_serial(chunks)
252: else:
253: # Determine max workers
254: max_workers = min(
255: 4, # Reasonable limit to avoid rate limiting
256: len(chunks), # No more workers than chunks
257: max(1, psutil.cpu_count() // 2), # Use half of CPU cores
258: )
259:
260: logger.info(
261: "Using %d workers for parallel TTS processing",
262: max_workers,
263: )
264:
265: # Process chunks in parallel
266: chunk_results: list[tuple[int, bytes]] = []
267:
268: with concurrent.futures.ThreadPoolExecutor(
269: max_workers=max_workers,
270: ) as executor:
271: # Submit all chunks for processing
272: future_to_index = {
273: executor.submit(self._process_tts_chunk, chunk, i): i
274: for i, chunk in enumerate(chunks)
275: }
276:
277: # Collect results as they complete
278: for future in concurrent.futures.as_completed(
279: future_to_index,
280: ):
281: index = future_to_index[future]
282: try:
283: audio_data = future.result()
284: chunk_results.append((index, audio_data))
285: except Exception:
286: logger.exception(
287: "Failed to process chunk %d",
288: index,
289: )
290: raise
291:
292: # Sort results by index to maintain order
293: chunk_results.sort(key=operator.itemgetter(0))
294:
295: # Combine audio chunks
296: content_audio_bytes = self._combine_audio_chunks([
297: data for _, data in chunk_results
298: ])
299:
300: # Combine intro, content, and outro with pauses
301: return ArticleProcessor._combine_intro_content_outro(
302: intro_audio,
303: content_audio_bytes,
304: outro_audio,
305: )
306:
307: except Exception:
308: logger.exception("TTS generation failed")
309: raise
310:
311: @staticmethod
312: def _create_intro_text(
313: title: str,
314: author: str | None,
315: pub_date: str | None,
316: ) -> str:
317: """Create intro text with available metadata."""
318: parts = [f"Title: {title}"]
319:
320: if author:
321: parts.append(f"Author: {author}")
322:
323: if pub_date:
324: parts.append(f"Published: {pub_date}")
325:
326: return ". ".join(parts) + "."
327:
328: @staticmethod
329: def _create_outro_text(title: str, author: str | None) -> str:
330: """Create outro text with attribution."""
331: if author:
332: return (
333: f"This has been an audio version of {title} "
334: f"by {author}, created using Podcast It Later."
335: )
336: return (
337: f"This has been an audio version of {title}, "
338: "created using Podcast It Later."
339: )
340:
341: def _generate_tts_segment(self, text: str) -> bytes:
342: """Generate TTS audio for a single segment (intro/outro).
343:
344: Args:
345: text: Text to convert to speech
346:
347: Returns:
348: MP3 audio bytes
349: """
350: response = self.openai_client.audio.speech.create(
351: model=TTS_MODEL,
352: voice=TTS_VOICE,
353: input=text,
354: )
355: return response.content
356:
357: @staticmethod
358: def _combine_intro_content_outro(
359: intro_audio: bytes,
360: content_audio: bytes,
361: outro_audio: bytes,
362: ) -> bytes:
363: """Combine intro, content, and outro with crossfades.
364:
365: Args:
366: intro_audio: MP3 bytes for intro
367: content_audio: MP3 bytes for main content
368: outro_audio: MP3 bytes for outro
369:
370: Returns:
371: Combined MP3 audio bytes
372: """
373: # Load audio segments
374: intro = AudioSegment.from_mp3(io.BytesIO(intro_audio))
375: content = AudioSegment.from_mp3(io.BytesIO(content_audio))
376: outro = AudioSegment.from_mp3(io.BytesIO(outro_audio))
377:
378: # Create bridge silence (pause + 2 * crossfade to account for overlap)
379: bridge = AudioSegment.silent(
380: duration=PAUSE_DURATION + 2 * CROSSFADE_DURATION
381: )
382:
383: def safe_append(
384: seg1: AudioSegment, seg2: AudioSegment, crossfade: int
385: ) -> AudioSegment:
386: if len(seg1) < crossfade or len(seg2) < crossfade:
387: logger.warning(
388: "Segment too short for crossfade (%dms vs %dms/%dms), using concatenation",
389: crossfade,
390: len(seg1),
391: len(seg2),
392: )
393: return seg1 + seg2
394: return seg1.append(seg2, crossfade=crossfade)
395:
396: # Combine segments with crossfades
397: # Intro -> Bridge -> Content -> Bridge -> Outro
398: # This effectively fades out the previous segment and fades in the next one
399: combined = safe_append(intro, bridge, CROSSFADE_DURATION)
400: combined = safe_append(combined, content, CROSSFADE_DURATION)
401: combined = safe_append(combined, bridge, CROSSFADE_DURATION)
402: combined = safe_append(combined, outro, CROSSFADE_DURATION)
403:
404: # Export to bytes
405: output = io.BytesIO()
406: combined.export(output, format="mp3")
407: return output.getvalue()
408:
409: def _process_tts_chunk(self, chunk: str, index: int) -> bytes:
410: """Process a single TTS chunk.
411:
412: Args:
413: chunk: Text to convert to speech
414: index: Chunk index for logging
415:
416: Returns:
417: Audio data as bytes
418: """
419: logger.info(
420: "Generating TTS for chunk %d (%d chars)",
421: index + 1,
422: len(chunk),
423: )
424:
425: response = self.openai_client.audio.speech.create(
426: model=TTS_MODEL,
427: voice=TTS_VOICE,
428: input=chunk,
429: response_format="mp3",
430: )
431:
432: return response.content
433:
434: @staticmethod
435: def _combine_audio_chunks(audio_chunks: list[bytes]) -> bytes:
436: """Combine multiple audio chunks with silence gaps.
437:
438: Args:
439: audio_chunks: List of audio data in order
440:
441: Returns:
442: Combined audio data
443: """
444: if not audio_chunks:
445: msg = "No audio chunks to combine"
446: raise ValueError(msg)
447:
448: # Create a temporary file for the combined audio
449: with tempfile.NamedTemporaryFile(
450: suffix=".mp3",
451: delete=False,
452: ) as temp_file:
453: temp_path = temp_file.name
454:
455: try:
456: # Start with the first chunk
457: combined_audio = AudioSegment.from_mp3(io.BytesIO(audio_chunks[0]))
458:
459: # Add remaining chunks with silence gaps
460: for chunk_data in audio_chunks[1:]:
461: chunk_audio = AudioSegment.from_mp3(io.BytesIO(chunk_data))
462: silence = AudioSegment.silent(duration=300) # 300ms gap
463: combined_audio = combined_audio + silence + chunk_audio
464:
465: # Export to file
466: combined_audio.export(temp_path, format="mp3", bitrate="128k")
467:
468: # Read back the combined audio
469: return Path(temp_path).read_bytes()
470:
471: finally:
472: # Clean up temp file
473: if Path(temp_path).exists():
474: Path(temp_path).unlink()
475:
476: def _text_to_speech_serial(self, chunks: list[str]) -> bytes:
477: """Fallback serial processing for high memory situations.
478:
479: This is the original serial implementation.
480: """
481: # Create a temporary file for streaming audio concatenation
482: with tempfile.NamedTemporaryFile(
483: suffix=".mp3",
484: delete=False,
485: ) as temp_file:
486: temp_path = temp_file.name
487:
488: try:
489: # Process first chunk
490: logger.info("Generating TTS for chunk 1/%d", len(chunks))
491: response = self.openai_client.audio.speech.create(
492: model=TTS_MODEL,
493: voice=TTS_VOICE,
494: input=chunks[0],
495: response_format="mp3",
496: )
497:
498: # Write first chunk directly to file
499: Path(temp_path).write_bytes(response.content)
500:
501: # Process remaining chunks
502: for i, chunk in enumerate(chunks[1:], 1):
503: logger.info(
504: "Generating TTS for chunk %d/%d (%d chars)",
505: i + 1,
506: len(chunks),
507: len(chunk),
508: )
509:
510: response = self.openai_client.audio.speech.create(
511: model=TTS_MODEL,
512: voice=TTS_VOICE,
513: input=chunk,
514: response_format="mp3",
515: )
516:
517: # Append to existing file with silence gap
518: # Load only the current segment
519: current_segment = AudioSegment.from_mp3(
520: io.BytesIO(response.content),
521: )
522:
523: # Load existing audio, append, and save back
524: existing_audio = AudioSegment.from_mp3(temp_path)
525: silence = AudioSegment.silent(duration=300)
526: combined = existing_audio + silence + current_segment
527:
528: # Export back to the same file
529: combined.export(temp_path, format="mp3", bitrate="128k")
530:
531: # Force garbage collection to free memory
532: del existing_audio, current_segment, combined
533:
534: # Small delay between API calls
535: if i < len(chunks) - 1:
536: time.sleep(0.5)
537:
538: # Read final result
539: audio_data = Path(temp_path).read_bytes()
540:
541: logger.info(
542: "Generated combined TTS audio: %d bytes",
543: len(audio_data),
544: )
545: return audio_data
546:
547: finally:
548: # Clean up temp file
549: temp_file_path = Path(temp_path)
550: if temp_file_path.exists():
551: temp_file_path.unlink()
552:
553: def upload_to_s3(self, audio_data: bytes, filename: str) -> str:
554: """Upload audio file to S3-compatible storage and return public URL.
555:
556: Raises:
557: ValueError: If S3 client is not configured.
558: ClientError: If S3 upload fails.
559: """
560: if not self.s3_client:
561: msg = "S3 client not configured"
562: raise ValueError(msg)
563:
564: try:
565: # Upload file using streaming to minimize memory usage
566: audio_stream = io.BytesIO(audio_data)
567: self.s3_client.upload_fileobj(
568: audio_stream,
569: S3_BUCKET,
570: filename,
571: ExtraArgs={
572: "ContentType": "audio/mpeg",
573: "ACL": "public-read",
574: },
575: )
576:
577: # Construct public URL
578: audio_url = f"{S3_ENDPOINT}/{S3_BUCKET}/{filename}"
579: logger.info(
580: "Uploaded audio to: %s (%d bytes)",
581: audio_url,
582: len(audio_data),
583: )
584: except ClientError:
585: logger.exception("S3 upload failed")
586: raise
587: else:
588: return audio_url
589:
590: @staticmethod
591: def estimate_duration(audio_data: bytes) -> int:
592: """Estimate audio duration in seconds based on file size and bitrate."""
593: # Rough estimation: MP3 at 128kbps = ~16KB per second
594: estimated_seconds = len(audio_data) // 16000
595: return max(1, estimated_seconds) # Minimum 1 second
596:
597: @staticmethod
598: def generate_filename(job_id: int, title: str) -> str:
599: """Generate unique filename for audio file."""
600: timestamp = int(datetime.now(tz=timezone.utc).timestamp())
601: # Create safe filename from title
602: safe_title = "".join(
603: c for c in title if c.isalnum() or c in {" ", "-", "_"}
604: ).rstrip()
605: safe_title = safe_title.replace(" ", "_")[:50] # Limit length
606: return f"episode_{timestamp}_{job_id}_{safe_title}.mp3"
607:
608: def process_job(
609: self,
610: job: dict[str, Any],
611: ) -> None:
612: """Process a single job through the complete pipeline."""
613: job_id = job["id"]
614: url = job["url"]
615:
616: # Check memory before starting
617: mem_usage = check_memory_usage()
618: if mem_usage > MEMORY_THRESHOLD:
619: logger.warning(
620: "High memory usage (%.1f%%), deferring job %d",
621: mem_usage,
622: job_id,
623: )
624: return
625:
626: # Track current job for graceful shutdown
627: self.shutdown_handler.set_current_job(job_id)
628:
629: try:
630: logger.info("Processing job %d: %s", job_id, url)
631:
632: # Update status to processing
633: Core.Database.update_job_status(
634: job_id,
635: "processing",
636: )
637:
638: # Check for shutdown before each major step
639: if self.shutdown_handler.is_shutdown_requested():
640: logger.info("Shutdown requested, aborting job %d", job_id)
641: Core.Database.update_job_status(job_id, "pending")
642: return
643:
644: # Step 1: Extract article content
645: Core.Database.update_job_status(job_id, "extracting")
646: title, content, author, pub_date = (
647: ArticleProcessor.extract_article_content(url)
648: )
649:
650: if self.shutdown_handler.is_shutdown_requested():
651: logger.info("Shutdown requested, aborting job %d", job_id)
652: Core.Database.update_job_status(job_id, "pending")
653: return
654:
655: # Step 2: Generate audio with metadata
656: Core.Database.update_job_status(job_id, "synthesizing")
657: audio_data = self.text_to_speech(content, title, author, pub_date)
658:
659: if self.shutdown_handler.is_shutdown_requested():
660: logger.info("Shutdown requested, aborting job %d", job_id)
661: Core.Database.update_job_status(job_id, "pending")
662: return
663:
664: # Step 3: Upload to S3
665: Core.Database.update_job_status(job_id, "uploading")
666: filename = ArticleProcessor.generate_filename(job_id, title)
667: audio_url = self.upload_to_s3(audio_data, filename)
668:
669: # Step 4: Calculate duration
670: duration = ArticleProcessor.estimate_duration(audio_data)
671:
672: # Step 5: Create episode record
673: url_hash = Core.hash_url(url)
674: episode_id = Core.Database.create_episode(
675: title=title,
676: audio_url=audio_url,
677: duration=duration,
678: content_length=len(content),
679: user_id=job.get("user_id"),
680: author=job.get("author"), # Pass author from job
681: original_url=url, # Pass the original article URL
682: original_url_hash=url_hash,
683: )
684:
685: # Add episode to user's feed
686: user_id = job.get("user_id")
687: if user_id:
688: Core.Database.add_episode_to_user(user_id, episode_id)
689: Core.Database.track_episode_event(
690: episode_id,
691: "added",
692: user_id,
693: )
694:
695: # Step 6: Mark job as complete
696: Core.Database.update_job_status(
697: job_id,
698: "completed",
699: )
700:
701: logger.info(
702: "Successfully processed job %d -> episode %d",
703: job_id,
704: episode_id,
705: )
706:
707: except Exception as e:
708: error_msg = str(e)
709: logger.exception("Job %d failed: %s", job_id, error_msg)
710: Core.Database.update_job_status(
711: job_id,
712: "error",
713: error_msg,
714: )
715: raise
716: finally:
717: # Clear current job
718: self.shutdown_handler.set_current_job(None)
719:
720:
Now let me read the test classes for ArticleProcessor:
Biz/PodcastItLater/Worker.py @1193-17071193: class TestTextToSpeech(Test.TestCase):
1194: """Test text-to-speech functionality."""
1195:
1196: def setUp(self) -> None:
1197: """Set up mocks."""
1198: # Mock OpenAI API key
1199: self.env_patcher = unittest.mock.patch.dict(
1200: os.environ,
1201: {"OPENAI_API_KEY": "test-key"},
1202: )
1203: self.env_patcher.start()
1204:
1205: # Mock OpenAI response
1206: self.mock_audio_response: unittest.mock.MagicMock = (
1207: unittest.mock.MagicMock()
1208: )
1209: self.mock_audio_response.content = b"fake-audio-data"
1210:
1211: # Mock AudioSegment to avoid ffmpeg issues in tests
1212: self.mock_audio_segment: unittest.mock.MagicMock = (
1213: unittest.mock.MagicMock()
1214: )
1215: self.mock_audio_segment.export.return_value = None
1216: self.audio_segment_patcher = unittest.mock.patch(
1217: "pydub.AudioSegment.from_mp3",
1218: return_value=self.mock_audio_segment,
1219: )
1220: self.audio_segment_patcher.start()
1221:
1222: # Mock the concatenation operations
1223: self.mock_audio_segment.__add__.return_value = self.mock_audio_segment
1224:
1225: def tearDown(self) -> None:
1226: """Clean up mocks."""
1227: self.env_patcher.stop()
1228: self.audio_segment_patcher.stop()
1229:
1230: def test_tts_generation(self) -> None:
1231: """Generate audio from text."""
1232:
1233: # Mock the export to write test audio data
1234: def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
1235: buffer.write(b"test-audio-output")
1236: buffer.seek(0)
1237:
1238: self.mock_audio_segment.export.side_effect = mock_export
1239:
1240: # Mock OpenAI client
1241: mock_client = unittest.mock.MagicMock()
1242: mock_audio = unittest.mock.MagicMock()
1243: mock_speech = unittest.mock.MagicMock()
1244: mock_speech.create.return_value = self.mock_audio_response
1245: mock_audio.speech = mock_speech
1246: mock_client.audio = mock_audio
1247:
1248: with (
1249: unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1250: unittest.mock.patch(
1251: "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1252: return_value=["Test content"],
1253: ),
1254: ):
1255: shutdown_handler = ShutdownHandler()
1256: processor = ArticleProcessor(shutdown_handler)
1257: audio_data = processor.text_to_speech(
1258: "Test content",
1259: "Test Title",
1260: )
1261:
1262: self.assertIsInstance(audio_data, bytes)
1263: self.assertEqual(audio_data, b"test-audio-output")
1264:
1265: def test_tts_chunking(self) -> None:
1266: """Handle long articles with chunking."""
1267: long_text = "Long content " * 1000
1268: chunks = ["Chunk 1", "Chunk 2", "Chunk 3"]
1269:
1270: def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
1271: buffer.write(b"test-audio-output")
1272: buffer.seek(0)
1273:
1274: self.mock_audio_segment.export.side_effect = mock_export
1275:
1276: # Mock AudioSegment.silent
1277: # Mock OpenAI client
1278: mock_client = unittest.mock.MagicMock()
1279: mock_audio = unittest.mock.MagicMock()
1280: mock_speech = unittest.mock.MagicMock()
1281: mock_speech.create.return_value = self.mock_audio_response
1282: mock_audio.speech = mock_speech
1283: mock_client.audio = mock_audio
1284:
1285: with (
1286: unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1287: unittest.mock.patch(
1288: "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1289: return_value=chunks,
1290: ),
1291: unittest.mock.patch(
1292: "pydub.AudioSegment.silent",
1293: return_value=self.mock_audio_segment,
1294: ),
1295: ):
1296: shutdown_handler = ShutdownHandler()
1297: processor = ArticleProcessor(shutdown_handler)
1298: audio_data = processor.text_to_speech(
1299: long_text,
1300: "Long Article",
1301: )
1302:
1303: # Should have called TTS for each chunk
1304: self.assertIsInstance(audio_data, bytes)
1305: self.assertEqual(audio_data, b"test-audio-output")
1306:
1307: def test_tts_empty_text(self) -> None:
1308: """Handle empty input."""
1309: with unittest.mock.patch(
1310: "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1311: return_value=[],
1312: ):
1313: shutdown_handler = ShutdownHandler()
1314: processor = ArticleProcessor(shutdown_handler)
1315: with pytest.raises(ValueError, match="No chunks generated") as cm:
1316: processor.text_to_speech("", "Empty")
1317:
1318: self.assertIn("No chunks generated", str(cm.value))
1319:
1320: def test_tts_special_characters(self) -> None:
1321: """Handle unicode and special chars."""
1322: special_text = 'Unicode: 你好世界 Émojis: 🎙️📰 Special: <>&"'
1323:
1324: def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
1325: buffer.write(b"test-audio-output")
1326: buffer.seek(0)
1327:
1328: self.mock_audio_segment.export.side_effect = mock_export
1329:
1330: # Mock OpenAI client
1331: mock_client = unittest.mock.MagicMock()
1332: mock_audio = unittest.mock.MagicMock()
1333: mock_speech = unittest.mock.MagicMock()
1334: mock_speech.create.return_value = self.mock_audio_response
1335: mock_audio.speech = mock_speech
1336: mock_client.audio = mock_audio
1337:
1338: with (
1339: unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1340: unittest.mock.patch(
1341: "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1342: return_value=[special_text],
1343: ),
1344: ):
1345: shutdown_handler = ShutdownHandler()
1346: processor = ArticleProcessor(shutdown_handler)
1347: audio_data = processor.text_to_speech(
1348: special_text,
1349: "Special",
1350: )
1351:
1352: self.assertIsInstance(audio_data, bytes)
1353: self.assertEqual(audio_data, b"test-audio-output")
1354:
1355: def test_llm_text_preparation(self) -> None:
1356: """Verify LLM editing."""
1357: # Test the actual text preparation functions
1358: chunks = split_text_into_chunks("Short text", max_chars=100)
1359: self.assertEqual(len(chunks), 1)
1360: self.assertEqual(chunks[0], "Short text")
1361:
1362: # Test long text splitting
1363: long_text = "Sentence one. " * 100
1364: chunks = split_text_into_chunks(long_text, max_chars=100)
1365: self.assertGreater(len(chunks), 1)
1366: for chunk in chunks:
1367: self.assertLessEqual(len(chunk), 100)
1368:
1369: @staticmethod
1370: def test_llm_failure_fallback() -> None:
1371: """Handle LLM API failures."""
1372: # Mock LLM failure
1373: with unittest.mock.patch("openai.OpenAI") as mock_openai:
1374: mock_client = mock_openai.return_value
1375: mock_client.chat.completions.create.side_effect = Exception(
1376: "API Error",
1377: )
1378:
1379: # Should fall back to raw text
1380: with pytest.raises(Exception, match="API Error"):
1381: edit_chunk_for_speech("Test chunk", "Title", is_first=True)
1382:
1383: def test_chunk_concatenation(self) -> None:
1384: """Verify audio joining."""
1385: # Mock multiple audio segments
1386: chunks = ["Chunk 1", "Chunk 2"]
1387:
1388: def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
1389: buffer.write(b"test-audio-output")
1390: buffer.seek(0)
1391:
1392: self.mock_audio_segment.export.side_effect = mock_export
1393:
1394: # Mock OpenAI client
1395: mock_client = unittest.mock.MagicMock()
1396: mock_audio = unittest.mock.MagicMock()
1397: mock_speech = unittest.mock.MagicMock()
1398: mock_speech.create.return_value = self.mock_audio_response
1399: mock_audio.speech = mock_speech
1400: mock_client.audio = mock_audio
1401:
1402: with (
1403: unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1404: unittest.mock.patch(
1405: "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1406: return_value=chunks,
1407: ),
1408: unittest.mock.patch(
1409: "pydub.AudioSegment.silent",
1410: return_value=self.mock_audio_segment,
1411: ),
1412: ):
1413: shutdown_handler = ShutdownHandler()
1414: processor = ArticleProcessor(shutdown_handler)
1415: audio_data = processor.text_to_speech("Test", "Title")
1416:
1417: # Should produce combined audio
1418: self.assertIsInstance(audio_data, bytes)
1419: self.assertEqual(audio_data, b"test-audio-output")
1420:
1421: def test_parallel_tts_generation(self) -> None:
1422: """Test parallel TTS processing."""
1423: chunks = ["Chunk 1", "Chunk 2", "Chunk 3", "Chunk 4"]
1424:
1425: # Mock responses for each chunk
1426: mock_responses = []
1427: for i in range(len(chunks)):
1428: mock_resp = unittest.mock.MagicMock()
1429: mock_resp.content = f"audio-{i}".encode()
1430: mock_responses.append(mock_resp)
1431:
1432: # Mock OpenAI client
1433: mock_client = unittest.mock.MagicMock()
1434: mock_audio = unittest.mock.MagicMock()
1435: mock_speech = unittest.mock.MagicMock()
1436:
1437: # Make create return different responses for each call
1438: mock_speech.create.side_effect = mock_responses
1439: mock_audio.speech = mock_speech
1440: mock_client.audio = mock_audio
1441:
1442: # Mock AudioSegment operations
1443: mock_segment = unittest.mock.MagicMock()
1444: mock_segment.__add__.return_value = mock_segment
1445:
1446: def mock_export(path: str, **_kwargs: typing.Any) -> None:
1447: Path(path).write_bytes(b"combined-audio")
1448:
1449: mock_segment.export = mock_export
1450:
1451: with (
1452: unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1453: unittest.mock.patch(
1454: "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1455: return_value=chunks,
1456: ),
1457: unittest.mock.patch(
1458: "pydub.AudioSegment.from_mp3",
1459: return_value=mock_segment,
1460: ),
1461: unittest.mock.patch(
1462: "pydub.AudioSegment.silent",
1463: return_value=mock_segment,
1464: ),
1465: unittest.mock.patch(
1466: "Biz.PodcastItLater.Worker.check_memory_usage",
1467: return_value=50.0, # Normal memory usage
1468: ),
1469: ):
1470: shutdown_handler = ShutdownHandler()
1471: processor = ArticleProcessor(shutdown_handler)
1472: audio_data = processor.text_to_speech("Test content", "Test Title")
1473:
1474: # Verify all chunks were processed
1475: self.assertEqual(mock_speech.create.call_count, len(chunks))
1476: self.assertEqual(audio_data, b"combined-audio")
1477:
1478: def test_parallel_tts_high_memory_fallback(self) -> None:
1479: """Test fallback to serial processing when memory is high."""
1480: chunks = ["Chunk 1", "Chunk 2"]
1481:
1482: def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
1483: buffer.write(b"serial-audio")
1484: buffer.seek(0)
1485:
1486: self.mock_audio_segment.export.side_effect = mock_export
1487:
1488: # Mock OpenAI client
1489: mock_client = unittest.mock.MagicMock()
1490: mock_audio = unittest.mock.MagicMock()
1491: mock_speech = unittest.mock.MagicMock()
1492: mock_speech.create.return_value = self.mock_audio_response
1493: mock_audio.speech = mock_speech
1494: mock_client.audio = mock_audio
1495:
1496: with (
1497: unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1498: unittest.mock.patch(
1499: "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1500: return_value=chunks,
1501: ),
1502: unittest.mock.patch(
1503: "Biz.PodcastItLater.Worker.check_memory_usage",
1504: return_value=65.0, # High memory usage
1505: ),
1506: unittest.mock.patch(
1507: "pydub.AudioSegment.silent",
1508: return_value=self.mock_audio_segment,
1509: ),
1510: ):
1511: shutdown_handler = ShutdownHandler()
1512: processor = ArticleProcessor(shutdown_handler)
1513: audio_data = processor.text_to_speech("Test content", "Test Title")
1514:
1515: # Should use serial processing
1516: self.assertEqual(audio_data, b"serial-audio")
1517:
1518: @staticmethod
1519: def test_parallel_tts_error_handling() -> None:
1520: """Test error handling in parallel TTS processing."""
1521: chunks = ["Chunk 1", "Chunk 2"]
1522:
1523: # Mock OpenAI client with one failure
1524: mock_client = unittest.mock.MagicMock()
1525: mock_audio = unittest.mock.MagicMock()
1526: mock_speech = unittest.mock.MagicMock()
1527:
1528: # First call succeeds, second fails
1529: mock_resp1 = unittest.mock.MagicMock()
1530: mock_resp1.content = b"audio-1"
1531: mock_speech.create.side_effect = [mock_resp1, Exception("API Error")]
1532:
1533: mock_audio.speech = mock_speech
1534: mock_client.audio = mock_audio
1535:
1536: # Set up the test context
1537: shutdown_handler = ShutdownHandler()
1538: processor = ArticleProcessor(shutdown_handler)
1539:
1540: with (
1541: unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1542: unittest.mock.patch(
1543: "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1544: return_value=chunks,
1545: ),
1546: unittest.mock.patch(
1547: "Biz.PodcastItLater.Worker.check_memory_usage",
1548: return_value=50.0,
1549: ),
1550: pytest.raises(Exception, match="API Error"),
1551: ):
1552: processor.text_to_speech("Test content", "Test Title")
1553:
1554: def test_parallel_tts_order_preservation(self) -> None:
1555: """Test that chunks are combined in the correct order."""
1556: chunks = ["First", "Second", "Third", "Fourth", "Fifth"]
1557:
1558: # Create mock responses with identifiable content
1559: mock_responses = []
1560: for chunk in chunks:
1561: mock_resp = unittest.mock.MagicMock()
1562: mock_resp.content = f"audio-{chunk}".encode()
1563: mock_responses.append(mock_resp)
1564:
1565: # Mock OpenAI client
1566: mock_client = unittest.mock.MagicMock()
1567: mock_audio = unittest.mock.MagicMock()
1568: mock_speech = unittest.mock.MagicMock()
1569: mock_speech.create.side_effect = mock_responses
1570: mock_audio.speech = mock_speech
1571: mock_client.audio = mock_audio
1572:
1573: # Track the order of segments being combined
1574: combined_order = []
1575:
1576: def mock_from_mp3(data: io.BytesIO) -> unittest.mock.MagicMock:
1577: content = data.read()
1578: combined_order.append(content.decode())
1579: segment = unittest.mock.MagicMock()
1580: segment.__add__.return_value = segment
1581: return segment
1582:
1583: mock_segment = unittest.mock.MagicMock()
1584: mock_segment.__add__.return_value = mock_segment
1585:
1586: def mock_export(path: str, **_kwargs: typing.Any) -> None:
1587: # Verify order is preserved
1588: expected_order = [f"audio-{chunk}" for chunk in chunks]
1589: if combined_order != expected_order:
1590: msg = f"Order mismatch: {combined_order} != {expected_order}"
1591: raise AssertionError(msg)
1592: Path(path).write_bytes(b"ordered-audio")
1593:
1594: mock_segment.export = mock_export
1595:
1596: with (
1597: unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1598: unittest.mock.patch(
1599: "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1600: return_value=chunks,
1601: ),
1602: unittest.mock.patch(
1603: "pydub.AudioSegment.from_mp3",
1604: side_effect=mock_from_mp3,
1605: ),
1606: unittest.mock.patch(
1607: "pydub.AudioSegment.silent",
1608: return_value=mock_segment,
1609: ),
1610: unittest.mock.patch(
1611: "Biz.PodcastItLater.Worker.check_memory_usage",
1612: return_value=50.0,
1613: ),
1614: ):
1615: shutdown_handler = ShutdownHandler()
1616: processor = ArticleProcessor(shutdown_handler)
1617: audio_data = processor.text_to_speech("Test content", "Test Title")
1618:
1619: self.assertEqual(audio_data, b"ordered-audio")
1620:
1621:
1622: class TestIntroOutro(Test.TestCase):
1623: """Test intro and outro generation with metadata."""
1624:
1625: def test_create_intro_text_full_metadata(self) -> None:
1626: """Test intro text creation with all metadata."""
1627: intro = ArticleProcessor._create_intro_text( # noqa: SLF001
1628: title="Test Article",
1629: author="John Doe",
1630: pub_date="2024-01-15",
1631: )
1632: self.assertIn("Title: Test Article", intro)
1633: self.assertIn("Author: John Doe", intro)
1634: self.assertIn("Published: 2024-01-15", intro)
1635:
1636: def test_create_intro_text_no_author(self) -> None:
1637: """Test intro text without author."""
1638: intro = ArticleProcessor._create_intro_text( # noqa: SLF001
1639: title="Test Article",
1640: author=None,
1641: pub_date="2024-01-15",
1642: )
1643: self.assertIn("Title: Test Article", intro)
1644: self.assertNotIn("Author:", intro)
1645: self.assertIn("Published: 2024-01-15", intro)
1646:
1647: def test_create_intro_text_minimal(self) -> None:
1648: """Test intro text with only title."""
1649: intro = ArticleProcessor._create_intro_text( # noqa: SLF001
1650: title="Test Article",
1651: author=None,
1652: pub_date=None,
1653: )
1654: self.assertEqual(intro, "Title: Test Article.")
1655:
1656: def test_create_outro_text_with_author(self) -> None:
1657: """Test outro text with author."""
1658: outro = ArticleProcessor._create_outro_text( # noqa: SLF001
1659: title="Test Article",
1660: author="Jane Smith",
1661: )
1662: self.assertIn("Test Article", outro)
1663: self.assertIn("Jane Smith", outro)
1664: self.assertIn("Podcast It Later", outro)
1665:
1666: def test_create_outro_text_no_author(self) -> None:
1667: """Test outro text without author."""
1668: outro = ArticleProcessor._create_outro_text( # noqa: SLF001
1669: title="Test Article",
1670: author=None,
1671: )
1672: self.assertIn("Test Article", outro)
1673: self.assertNotIn("by", outro)
1674: self.assertIn("Podcast It Later", outro)
1675:
1676: def test_extract_with_metadata(self) -> None:
1677: """Test that extraction returns metadata."""
1678: mock_html = "<html><body><p>Content</p></body></html>"
1679: mock_result = json.dumps({
1680: "title": "Test Article",
1681: "text": "Article content",
1682: "author": "Test Author",
1683: "date": "2024-01-15",
1684: })
1685:
1686: with (
1687: unittest.mock.patch(
1688: "trafilatura.fetch_url",
1689: return_value=mock_html,
1690: ),
1691: unittest.mock.patch(
1692: "trafilatura.extract",
1693: return_value=mock_result,
1694: ),
1695: ):
1696: title, content, author, pub_date = (
1697: ArticleProcessor.extract_article_content(
1698: "https://example.com",
1699: )
1700: )
1701:
1702: self.assertEqual(title, "Test Article")
1703: self.assertEqual(content, "Article content")
1704: self.assertEqual(author, "Test Author")
1705: self.assertEqual(pub_date, "2024-01-15")
1706:
1707:
Now let me read the TestTextChunking class:
Biz/PodcastItLater/Worker.py @2156-21782156: class TestTextChunking(Test.TestCase): 2157: """Test text chunking edge cases.""" 2158: 2159: def test_split_text_single_long_word(self) -> None: 2160: """Handle text with a single word exceeding limit.""" 2161: long_word = "a" * 4000 2162: chunks = split_text_into_chunks(long_word, max_chars=3000) 2163: 2164: # Should keep it as one chunk or split? 2165: # The current implementation does not split words 2166: self.assertEqual(len(chunks), 1) 2167: self.assertEqual(len(chunks[0]), 4000) 2168: 2169: def test_split_text_no_sentence_boundaries(self) -> None: 2170: """Handle long text with no sentence boundaries.""" 2171: text = "word " * 1000 # 5000 chars 2172: chunks = split_text_into_chunks(text, max_chars=3000) 2173: 2174: # Should keep it as one chunk as it can't split by ". " 2175: self.assertEqual(len(chunks), 1) 2176: self.assertGreater(len(chunks[0]), 3000) 2177: 2178:
Now let me check TestJobProcessing and TestMemoryEfficiency and TestWorkerErrorHandling:
Biz/PodcastItLater/Worker.py @1708-18501708: class TestMemoryEfficiency(Test.TestCase):
1709: """Test memory-efficient processing."""
1710:
1711: def test_large_article_size_limit(self) -> None:
1712: """Test that articles exceeding size limits are rejected."""
1713: huge_text = "x" * (MAX_ARTICLE_SIZE + 1000) # Exceed limit
1714:
1715: with (
1716: unittest.mock.patch(
1717: "trafilatura.fetch_url",
1718: return_value=huge_text * 4, # Simulate large HTML
1719: ),
1720: pytest.raises(ValueError, match="Article too large") as cm,
1721: ):
1722: ArticleProcessor.extract_article_content("https://example.com")
1723:
1724: self.assertIn("Article too large", str(cm.value))
1725:
1726: def test_content_truncation(self) -> None:
1727: """Test that oversized content is truncated."""
1728: large_content = "Content " * 100_000 # Create large content
1729: mock_result = json.dumps({
1730: "title": "Large Article",
1731: "text": large_content,
1732: })
1733:
1734: with (
1735: unittest.mock.patch(
1736: "trafilatura.fetch_url",
1737: return_value="<html><body>content</body></html>",
1738: ),
1739: unittest.mock.patch(
1740: "trafilatura.extract",
1741: return_value=mock_result,
1742: ),
1743: ):
1744: title, content, _author, _pub_date = (
1745: ArticleProcessor.extract_article_content(
1746: "https://example.com",
1747: )
1748: )
1749:
1750: self.assertEqual(title, "Large Article")
1751: self.assertLessEqual(len(content), MAX_ARTICLE_SIZE)
1752:
1753: def test_memory_usage_check(self) -> None:
1754: """Test memory usage monitoring."""
1755: usage = check_memory_usage()
1756: self.assertIsInstance(usage, float)
1757: self.assertGreaterEqual(usage, 0.0)
1758: self.assertLessEqual(usage, 100.0)
1759:
1760:
1761: class TestJobProcessing(Test.TestCase):
1762: """Test job processing functionality."""
1763:
1764: def setUp(self) -> None:
1765: """Set up test environment."""
1766: Core.Database.init_db()
1767:
1768: # Create test user and job
1769: self.user_id, _ = Core.Database.create_user(
1770: "test@example.com",
1771: )
1772: self.job_id = Core.Database.add_to_queue(
1773: "https://example.com/article",
1774: "test@example.com",
1775: self.user_id,
1776: )
1777:
1778: # Mock environment
1779: self.env_patcher = unittest.mock.patch.dict(
1780: os.environ,
1781: {
1782: "OPENAI_API_KEY": "test-key",
1783: "S3_ENDPOINT": "https://s3.example.com",
1784: "S3_BUCKET": "test-bucket",
1785: "S3_ACCESS_KEY": "test-access",
1786: "S3_SECRET_KEY": "test-secret",
1787: },
1788: )
1789: self.env_patcher.start()
1790:
1791: def tearDown(self) -> None:
1792: """Clean up."""
1793: self.env_patcher.stop()
1794: Core.Database.teardown()
1795:
1796: def test_process_job_success(self) -> None:
1797: """Complete pipeline execution."""
1798: shutdown_handler = ShutdownHandler()
1799: processor = ArticleProcessor(shutdown_handler)
1800: job = Core.Database.get_job_by_id(self.job_id)
1801: if job is None:
1802: msg = "no job found for %s"
1803: raise Test.TestError(msg, self.job_id)
1804:
1805: # Mock all external calls
1806: with (
1807: unittest.mock.patch.object(
1808: ArticleProcessor,
1809: "extract_article_content",
1810: return_value=(
1811: "Test Title",
1812: "Test content",
1813: "Test Author",
1814: "2024-01-15",
1815: ),
1816: ),
1817: unittest.mock.patch.object(
1818: ArticleProcessor,
1819: "text_to_speech",
1820: return_value=b"audio-data",
1821: ),
1822: unittest.mock.patch.object(
1823: processor,
1824: "upload_to_s3",
1825: return_value="https://s3.example.com/audio.mp3",
1826: ),
1827: unittest.mock.patch(
1828: "Biz.PodcastItLater.Core.Database.update_job_status",
1829: ) as mock_update,
1830: unittest.mock.patch(
1831: "Biz.PodcastItLater.Core.Database.create_episode",
1832: ) as mock_create,
1833: ):
1834: mock_create.return_value = 1
1835: processor.process_job(job)
1836:
1837: # Verify job was marked complete
1838: mock_update.assert_called_with(self.job_id, "completed")
1839: mock_create.assert_called_once()
1840:
1841: def test_process_job_extraction_failure(self) -> None:
1842: """Handle bad URLs."""
1843: shutdown_handler = ShutdownHandler()
1844: processor = ArticleProcessor(shutdown_handler)
1845: job = Core.Database.get_job_by_id(self.job_id)
1846: if job is None:
1847: msg = "no job found for %s"
1848: raise Test.TestError(msg, self.job_id)
1849:
1850: with (
Let me continue reading:
Biz/PodcastItLater/Worker.py @1850-20671850: with (
1851: unittest.mock.patch.object(
1852: ArticleProcessor,
1853: "extract_article_content",
1854: side_effect=ValueError("Bad URL"),
1855: ),
1856: unittest.mock.patch(
1857: "Biz.PodcastItLater.Core.Database.update_job_status",
1858: ) as mock_update,
1859: pytest.raises(ValueError, match="Bad URL"),
1860: ):
1861: processor.process_job(job)
1862:
1863: # Job should be marked as error
1864: mock_update.assert_called_with(self.job_id, "error", "Bad URL")
1865:
1866: def test_process_job_tts_failure(self) -> None:
1867: """Handle TTS errors."""
1868: shutdown_handler = ShutdownHandler()
1869: processor = ArticleProcessor(shutdown_handler)
1870: job = Core.Database.get_job_by_id(self.job_id)
1871: if job is None:
1872: msg = "no job found for %s"
1873: raise Test.TestError(msg, self.job_id)
1874:
1875: with (
1876: unittest.mock.patch.object(
1877: ArticleProcessor,
1878: "extract_article_content",
1879: return_value=("Title", "Content"),
1880: ),
1881: unittest.mock.patch.object(
1882: ArticleProcessor,
1883: "text_to_speech",
1884: side_effect=Exception("TTS Error"),
1885: ),
1886: unittest.mock.patch(
1887: "Biz.PodcastItLater.Core.Database.update_job_status",
1888: ) as mock_update,
1889: pytest.raises(Exception, match="TTS Error"),
1890: ):
1891: processor.process_job(job)
1892:
1893: mock_update.assert_called_with(self.job_id, "error", "TTS Error")
1894:
1895: def test_process_job_s3_failure(self) -> None:
1896: """Handle upload errors."""
1897: shutdown_handler = ShutdownHandler()
1898: processor = ArticleProcessor(shutdown_handler)
1899: job = Core.Database.get_job_by_id(self.job_id)
1900: if job is None:
1901: msg = "no job found for %s"
1902: raise Test.TestError(msg, self.job_id)
1903:
1904: with (
1905: unittest.mock.patch.object(
1906: ArticleProcessor,
1907: "extract_article_content",
1908: return_value=("Title", "Content"),
1909: ),
1910: unittest.mock.patch.object(
1911: ArticleProcessor,
1912: "text_to_speech",
1913: return_value=b"audio",
1914: ),
1915: unittest.mock.patch.object(
1916: processor,
1917: "upload_to_s3",
1918: side_effect=ClientError({}, "PutObject"),
1919: ),
1920: unittest.mock.patch(
1921: "Biz.PodcastItLater.Core.Database.update_job_status",
1922: ),
1923: pytest.raises(ClientError),
1924: ):
1925: processor.process_job(job)
1926:
1927: def test_job_retry_logic(self) -> None:
1928: """Verify exponential backoff."""
1929: # Set job to error with retry count
1930: Core.Database.update_job_status(
1931: self.job_id,
1932: "error",
1933: "First failure",
1934: )
1935: Core.Database.update_job_status(
1936: self.job_id,
1937: "error",
1938: "Second failure",
1939: )
1940:
1941: job = Core.Database.get_job_by_id(self.job_id)
1942: if job is None:
1943: msg = "no job found for %s"
1944: raise Test.TestError(msg, self.job_id)
1945:
1946: self.assertEqual(job["retry_count"], 2)
1947:
1948: # Should be retryable
1949: retryable = Core.Database.get_retryable_jobs(
1950: max_retries=3,
1951: )
1952: self.assertEqual(len(retryable), 1)
1953:
1954: def test_max_retries(self) -> None:
1955: """Stop after max attempts."""
1956: # Exceed retry limit
1957: for i in range(4):
1958: Core.Database.update_job_status(
1959: self.job_id,
1960: "error",
1961: f"Failure {i}",
1962: )
1963:
1964: # Should not be retryable
1965: retryable = Core.Database.get_retryable_jobs(
1966: max_retries=3,
1967: )
1968: self.assertEqual(len(retryable), 0)
1969:
1970: def test_graceful_shutdown(self) -> None:
1971: """Test graceful shutdown during job processing."""
1972: shutdown_handler = ShutdownHandler()
1973: processor = ArticleProcessor(shutdown_handler)
1974: job = Core.Database.get_job_by_id(self.job_id)
1975: if job is None:
1976: msg = "no job found for %s"
1977: raise Test.TestError(msg, self.job_id)
1978:
1979: def mock_tts(*_args: Any) -> bytes:
1980: shutdown_handler.shutdown_requested.set()
1981: return b"audio-data"
1982:
1983: with (
1984: unittest.mock.patch.object(
1985: ArticleProcessor,
1986: "extract_article_content",
1987: return_value=(
1988: "Test Title",
1989: "Test content",
1990: "Test Author",
1991: "2024-01-15",
1992: ),
1993: ),
1994: unittest.mock.patch.object(
1995: ArticleProcessor,
1996: "text_to_speech",
1997: side_effect=mock_tts,
1998: ),
1999: unittest.mock.patch(
2000: "Biz.PodcastItLater.Core.Database.update_job_status",
2001: ) as mock_update,
2002: ):
2003: processor.process_job(job)
2004:
2005: # Job should be reset to pending due to shutdown
2006: mock_update.assert_any_call(self.job_id, "pending")
2007:
2008: def test_cleanup_stale_jobs(self) -> None:
2009: """Test cleanup of stale processing jobs."""
2010: # Manually set job to processing
2011: Core.Database.update_job_status(self.job_id, "processing")
2012:
2013: # Run cleanup
2014: cleanup_stale_jobs()
2015:
2016: # Job should be back to pending
2017: job = Core.Database.get_job_by_id(self.job_id)
2018: if job is None:
2019: msg = "no job found for %s"
2020: raise Test.TestError(msg, self.job_id)
2021: self.assertEqual(job["status"], "pending")
2022:
2023: def test_concurrent_processing(self) -> None:
2024: """Handle multiple jobs."""
2025: # Create multiple jobs
2026: job2 = Core.Database.add_to_queue(
2027: "https://example.com/2",
2028: "test@example.com",
2029: self.user_id,
2030: )
2031: job3 = Core.Database.add_to_queue(
2032: "https://example.com/3",
2033: "test@example.com",
2034: self.user_id,
2035: )
2036:
2037: # Get pending jobs
2038: jobs = Core.Database.get_pending_jobs(limit=5)
2039:
2040: self.assertEqual(len(jobs), 3)
2041: self.assertEqual({j["id"] for j in jobs}, {self.job_id, job2, job3})
2042:
2043: def test_memory_threshold_deferral(self) -> None:
2044: """Test that jobs are deferred when memory usage is high."""
2045: shutdown_handler = ShutdownHandler()
2046: processor = ArticleProcessor(shutdown_handler)
2047: job = Core.Database.get_job_by_id(self.job_id)
2048: if job is None:
2049: msg = "no job found for %s"
2050: raise Test.TestError(msg, self.job_id)
2051:
2052: # Mock high memory usage
2053: with (
2054: unittest.mock.patch(
2055: "Biz.PodcastItLater.Worker.check_memory_usage",
2056: return_value=90.0, # High memory usage
2057: ),
2058: unittest.mock.patch(
2059: "Biz.PodcastItLater.Core.Database.update_job_status",
2060: ) as mock_update,
2061: ):
2062: processor.process_job(job)
2063:
2064: # Job should not be processed (no status updates)
2065: mock_update.assert_not_called()
2066:
2067:
Biz/PodcastItLater/Worker.py @2068-21552068: class TestWorkerErrorHandling(Test.TestCase):
2069: """Test worker error handling and recovery."""
2070:
2071: def setUp(self) -> None:
2072: """Set up test environment."""
2073: Core.Database.init_db()
2074: self.user_id, _ = Core.Database.create_user("test@example.com")
2075: self.job_id = Core.Database.add_to_queue(
2076: "https://example.com",
2077: "test@example.com",
2078: self.user_id,
2079: )
2080: self.shutdown_handler = ShutdownHandler()
2081: self.processor = ArticleProcessor(self.shutdown_handler)
2082:
2083: @staticmethod
2084: def tearDown() -> None:
2085: """Clean up."""
2086: Core.Database.teardown()
2087:
2088: def test_process_pending_jobs_exception_handling(self) -> None:
2089: """Test that process_pending_jobs handles exceptions."""
2090:
2091: def side_effect(job: dict[str, Any]) -> None:
2092: # Simulate process_job starting and setting status to processing
2093: Core.Database.update_job_status(job["id"], "processing")
2094: msg = "Unexpected Error"
2095: raise ValueError(msg)
2096:
2097: with (
2098: unittest.mock.patch.object(
2099: self.processor,
2100: "process_job",
2101: side_effect=side_effect,
2102: ),
2103: unittest.mock.patch(
2104: "Biz.PodcastItLater.Core.Database.update_job_status",
2105: side_effect=Core.Database.update_job_status,
2106: ) as _mock_update,
2107: ):
2108: process_pending_jobs(self.processor)
2109:
2110: # Job should be marked as error
2111: job = Core.Database.get_job_by_id(self.job_id)
2112: self.assertIsNotNone(job)
2113: if job:
2114: self.assertEqual(job["status"], "error")
2115: self.assertIn("Unexpected Error", job["error_message"])
2116:
2117: def test_process_retryable_jobs_success(self) -> None:
2118: """Test processing of retryable jobs."""
2119: # Set up a retryable job
2120: Core.Database.update_job_status(self.job_id, "error", "Fail 1")
2121:
2122: # Modify created_at to be in the past to satisfy backoff
2123: with Core.Database.get_connection() as conn:
2124: conn.execute(
2125: "UPDATE queue SET created_at = ? WHERE id = ?",
2126: (
2127: (
2128: datetime.now(tz=timezone.utc) - timedelta(minutes=5)
2129: ).isoformat(),
2130: self.job_id,
2131: ),
2132: )
2133: conn.commit()
2134:
2135: process_retryable_jobs()
2136:
2137: job = Core.Database.get_job_by_id(self.job_id)
2138: self.assertIsNotNone(job)
2139: if job:
2140: self.assertEqual(job["status"], "pending")
2141:
2142: def test_process_retryable_jobs_not_ready(self) -> None:
2143: """Test that jobs are not retried before backoff period."""
2144: # Set up a retryable job that just failed
2145: Core.Database.update_job_status(self.job_id, "error", "Fail 1")
2146:
2147: # created_at is now, so backoff should prevent retry
2148: process_retryable_jobs()
2149:
2150: job = Core.Database.get_job_by_id(self.job_id)
2151: self.assertIsNotNone(job)
2152: if job:
2153: self.assertEqual(job["status"], "error")
2154:
2155:
Perfect! Now I have a complete understanding. Let me start creating the new module files. I'll follow the pattern where tests are kept near the code they test.
Let's start with Worker/Processor.py:
Biz/PodcastItLater/Worker/Processor.pyNow let's create the TextProcessing module:
Biz/PodcastItLater/Worker/TextProcessing.pyNow let's create the Jobs module. Let me first read more of the test code to understand better:
Biz/PodcastItLater/Worker/Jobs.pyNow I need to update the main Worker.py file to remove the moved code and import from the new modules:
Biz/PodcastItLater/Worker.py @1-661: """Background worker for processing article-to-podcast conversions."""
2:
3: # : dep boto3
4: # : dep botocore
5: # : dep openai
6: # : dep psutil
7: # : dep pydub
8: # : dep pytest
9: # : dep pytest-asyncio
10: # : dep pytest-mock
11: # : dep trafilatura
12: # : out podcastitlater-worker
13: # : run ffmpeg
14: import Biz.PodcastItLater.Core as Core
15: import boto3 # type: ignore[import-untyped]
16: import concurrent.futures
17: import io
18: import json
19: import logging
20: import Omni.App as App
21: import Omni.Log as Log
22: import Omni.Test as Test
23: import openai
24: import operator
25: import os
26: import psutil # type: ignore[import-untyped]
27: import pytest
28: import signal
29: import sys
30: import tempfile
31: import threading
32: import time
33: import trafilatura
34: import typing
35: import unittest.mock
36: from botocore.exceptions import ClientError # type: ignore[import-untyped]
37: from datetime import datetime
38: from datetime import timedelta
39: from datetime import timezone
40: from pathlib import Path
41: from pydub import AudioSegment # type: ignore[import-untyped]
42: from typing import Any
43:
44: logger = logging.getLogger(__name__)
45: Log.setup(logger)
46:
47: # Configuration from environment variables
48: OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
49: S3_ENDPOINT = os.getenv("S3_ENDPOINT")
50: S3_BUCKET = os.getenv("S3_BUCKET")
51: S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
52: S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
53: area = App.from_env()
54:
55: # Worker configuration
56: MAX_CONTENT_LENGTH = 5000 # characters for TTS
57: MAX_ARTICLE_SIZE = 500_000 # 500KB character limit for articles
58: POLL_INTERVAL = 30 # seconds
59: MAX_RETRIES = 3
60: TTS_MODEL = "tts-1"
61: TTS_VOICE = "alloy"
62: MEMORY_THRESHOLD = 80 # Percentage threshold for memory usage
63: CROSSFADE_DURATION = 500 # ms for crossfading segments
64: PAUSE_DURATION = 1000 # ms for silence between segments
65:
66:
Now I'll create the new Worker.py file with imports from the new modules and the remaining code:
Biz/PodcastItLater/Worker.pyThere's a small issue in the Worker.py - I used timezone.timedelta instead of just timedelta. Let me fix that:
Biz/PodcastItLater/Worker.pyNow let me run the tests to see if everything works:
cd /tmp/tmp.YODuNmWBNL && bild --test Biz/PodcastItLater/Worker/py.hs 2>&1 | head -100/bin/sh: line 1: cd: /tmp/tmp.YODuNmWBNL: No such file or directory
bild --test Biz/PodcastItLater/Worker/py.hs 2>&1 | head -150[0m[38;5;1m[2Kfail: bild: nothing to build [0m[0m [0m
The .hs file doesn't exist. Let me check the namespace structure:
ls -la Biz/PodcastItLater/Worker* 2>&1-rw-r--r-- 1 ben users 1910 Sep 5 14:25 Biz/PodcastItLater/Worker.nix -rw-r--r-- 1 ben users 8432 Dec 2 14:23 Biz/PodcastItLater/Worker.py Biz/PodcastItLater/Worker: total 80 drwxr-xr-x 2 ben users 4096 Dec 2 14:22 . drwxr-xr-x 4 ben users 4096 Dec 2 14:16 .. -rw-r--r-- 1 ben users 16106 Dec 2 14:22 Jobs.py -rw-r--r-- 1 ben users 48166 Dec 2 14:20 Processor.py -rw-r--r-- 1 ben users 6601 Dec 2 14:20 TextProcessing.py
Good! The files are created. Let me test the Worker.py file directly:
bild --test Biz/PodcastItLater/Worker.py 2>&1 | head -2007[10000;10000H
[1A[1G[2K[.] Biz/PodcastItLater/Worker.py
[1A[1G[2K[0m[+] Biz/PodcastItLater/Worker.py[0m[1B[1A[1G[2K[0m[~] Biz/PodcastItLater/Worker.py[0m[1B[1A[1G[2K[~] Biz/PodcastItLater/Worker.py: error: … while evaluating the attrib...[1Berror:
… while evaluating the attribute 'drvPath'
at /nix/store/b462w1d6vfabn5dqzkxr4fpsd467x2gx-nixos-unstable-src/lib/customisation.nix:429:7:
428| // {
429| drvPath =
| ^
430| assert condition;
… while calling the 'derivationStrict' builtin
at <nix/derivation-internal.nix>:37:12:
36|
37| strict = derivationStrict drvAttrs;
| ^
38|
(stack trace truncated; use '--show-trace' to show the full, detailed trace)
error: attribute 'setuptools' missing
at /home/ben/omni/Omni/Bild/Builder.nix:284:25:
283| pyproject = true;
284| build-system = [python.packages.setuptools];
| ^
285| nativeBuildInputs = [makeWrapper];
[0m[38;5;1m[2Kfail: bild: instantiate: Biz/PodcastItLater/Worker.py
[0m[0m
[0m[0m
[0m[0m[38;5;1m[2Kfail: bild: timeout after 10 minutes
[0m[0m
[0m{"contents":[402.5556000000001,400],"tag":"GuardrailCostExceeded"}
Guardrail: cost budget exceeded (402.5556000000001/400.0 cents)