Split Biz/PodcastItLater/Worker.py into smaller modules

t-230·WorkTask·
·
·
·Biz/PodcastItLater/Worker/py.hs
Created3 months ago·Updated3 months ago

Description

Edit

Split Biz/PodcastItLater/Worker.py (2199 lines) into smaller modules.

Important: keep tests nearby the code that it is testing. For example: Feature class and TestFeature test cases should be in the same file.

Suggested Target Structure

Create these new files in Biz/PodcastItLater/:

1. Worker/Processor.py (~700 lines)

Move the ArticleProcessor class and its methods (lines 103-720):

  • class ArticleProcessor with all its methods for:
  • Article extraction
  • Text-to-speech conversion
  • Audio processing
  • S3 upload
  • TestArticleExtraction
  • TestTextToSpeech
  • TestIntroOutro
  • create test() and main() functions to integrate with the repo's common patterns

2. Worker/TextProcessing.py (~150 lines)

Move text processing functions:

  • prepare_text_for_tts (lines 721-760)
  • split_text_into_chunks (lines 761-798)
  • edit_chunk_for_speech (lines 799-873)
  • TestTextChunking
  • create test() and main() functions to integrate with the repo's common patterns

3. Worker/Jobs.py (~150 lines)

Move job management functions:

  • parse_datetime_with_timezone (lines 874-889)
  • should_retry_job (lines 890-906)
  • process_pending_jobs (lines 907-938)
  • process_retryable_jobs (lines 939-957)
  • check_memory_usage (lines 958-968)
  • cleanup_stale_jobs (lines 969-989)
  • main_loop (lines 990-1035)
  • TestJobProcessing
  • create test() and main() functions to integrate with the repo's common patterns

4. Keep in Worker.py (~150 lines)

  • Imports and configuration (lines 1-66)
  • ShutdownHandler class (lines 67-102)
  • move() function (lines 1036-1051)
  • test() and main() functions (lines 2179-2199)
  • TestMemoryEfficiency
  • TestWorkerErrorHandling

Instructions

1. Create the new files with proper imports 2. Update Worker.py to import from the new modules 3. Run: bild --test Biz/PodcastItLater/Worker.py 4. Verify all tests pass

Timeline (97)·$3.89·1.24M tokens

🤖Claimed[engineer]3 months ago
🔄[engineer]Open → InProgress3 months ago
▶️Running[engineer]{"started_at":"2025-12-02 19:14:39.765286743 UTC","completed_at":"2025-12-02 19:33:26.894383657 UTC","cost_cents":403}3 months ago
💭Thought[engineer]3 months ago

I'll start by exploring the current Worker.py file to understand its structure and dependencies.

ReadBiz/PodcastItLater/Worker.py @1-100
100 lines
1: """Background worker for processing article-to-podcast conversions."""
2: 
3: # : dep boto3
4: # : dep botocore
5: # : dep openai
6: # : dep psutil
7: # : dep pydub
8: # : dep pytest
9: # : dep pytest-asyncio
10: # : dep pytest-mock
11: # : dep trafilatura
12: # : out podcastitlater-worker
13: # : run ffmpeg
14: import Biz.PodcastItLater.Core as Core
15: import boto3  # type: ignore[import-untyped]
16: import concurrent.futures
17: import io
18: import json
19: import logging
20: import Omni.App as App
21: import Omni.Log as Log
22: import Omni.Test as Test
23: import openai
24: import operator
25: import os
26: import psutil  # type: ignore[import-untyped]
27: import pytest
28: import signal
29: import sys
30: import tempfile
31: import threading
32: import time
33: import trafilatura
34: import typing
35: import unittest.mock
36: from botocore.exceptions import ClientError  # type: ignore[import-untyped]
37: from datetime import datetime
38: from datetime import timedelta
39: from datetime import timezone
40: from pathlib import Path
41: from pydub import AudioSegment  # type: ignore[import-untyped]
42: from typing import Any
43: 
44: logger = logging.getLogger(__name__)
45: Log.setup(logger)
46: 
47: # Configuration from environment variables
48: OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
49: S3_ENDPOINT = os.getenv("S3_ENDPOINT")
50: S3_BUCKET = os.getenv("S3_BUCKET")
51: S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
52: S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
53: area = App.from_env()
54: 
55: # Worker configuration
56: MAX_CONTENT_LENGTH = 5000  # characters for TTS
57: MAX_ARTICLE_SIZE = 500_000  # 500KB character limit for articles
58: POLL_INTERVAL = 30  # seconds
59: MAX_RETRIES = 3
60: TTS_MODEL = "tts-1"
61: TTS_VOICE = "alloy"
62: MEMORY_THRESHOLD = 80  # Percentage threshold for memory usage
63: CROSSFADE_DURATION = 500  # ms for crossfading segments
64: PAUSE_DURATION = 1000  # ms for silence between segments
65: 
66: 
67: class ShutdownHandler:
68:     """Handles graceful shutdown of the worker."""
69: 
70:     def __init__(self) -> None:
71:         """Initialize shutdown handler."""
72:         self.shutdown_requested = threading.Event()
73:         self.current_job_id: int | None = None
74:         self.lock = threading.Lock()
75: 
76:         # Register signal handlers
77:         signal.signal(signal.SIGTERM, self._handle_signal)
78:         signal.signal(signal.SIGINT, self._handle_signal)
79: 
80:     def _handle_signal(self, signum: int, _frame: Any) -> None:
81:         """Handle shutdown signals."""
82:         logger.info(
83:             "Received signal %d, initiating graceful shutdown...",
84:             signum,
85:         )
86:         self.shutdown_requested.set()
87: 
88:     def is_shutdown_requested(self) -> bool:
89:         """Check if shutdown has been requested."""
90:         return self.shutdown_requested.is_set()
91: 
92:     def set_current_job(self, job_id: int | None) -> None:
93:         """Set the currently processing job."""
94:         with self.lock:
95:             self.current_job_id = job_id
96: 
97:     def get_current_job(self) -> int | None:
98:         """Get the currently processing job."""
99:         with self.lock:
100:             return self.current_job_id
ReadBiz/PodcastItLater/Worker.py @100-200
101 lines
100:             return self.current_job_id
101: 
102: 
103: class ArticleProcessor:
104:     """Handles the complete article-to-podcast conversion pipeline."""
105: 
106:     def __init__(self, shutdown_handler: ShutdownHandler) -> None:
107:         """Initialize the processor with required services.
108: 
109:         Raises:
110:             ValueError: If OPENAI_API_KEY environment variable is not set.
111:         """
112:         if not OPENAI_API_KEY:
113:             msg = "OPENAI_API_KEY environment variable is required"
114:             raise ValueError(msg)
115: 
116:         self.openai_client: openai.OpenAI = openai.OpenAI(
117:             api_key=OPENAI_API_KEY,
118:         )
119:         self.shutdown_handler = shutdown_handler
120: 
121:         # Initialize S3 client for Digital Ocean Spaces
122:         if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]):
123:             self.s3_client: Any = boto3.client(
124:                 "s3",
125:                 endpoint_url=S3_ENDPOINT,
126:                 aws_access_key_id=S3_ACCESS_KEY,
127:                 aws_secret_access_key=S3_SECRET_KEY,
128:             )
129:         else:
130:             logger.warning("S3 configuration incomplete, uploads will fail")
131:             self.s3_client = None
132: 
133:     @staticmethod
134:     def extract_article_content(
135:         url: str,
136:     ) -> tuple[str, str, str | None, str | None]:
137:         """Extract title, content, author, and date from article URL.
138: 
139:         Returns:
140:             tuple: (title, content, author, publication_date)
141: 
142:         Raises:
143:             ValueError: If content cannot be downloaded, extracted, or large.
144:         """
145:         try:
146:             downloaded = trafilatura.fetch_url(url)
147:             if not downloaded:
148:                 msg = f"Failed to download content from {url}"
149:                 raise ValueError(msg)  # noqa: TRY301
150: 
151:             # Check size before processing
152:             if (
153:                 len(downloaded) > MAX_ARTICLE_SIZE * 4
154:             ):  # Rough HTML to text ratio
155:                 msg = f"Article too large: {len(downloaded)} bytes"
156:                 raise ValueError(msg)  # noqa: TRY301
157: 
158:             # Extract with metadata
159:             result = trafilatura.extract(
160:                 downloaded,
161:                 include_comments=False,
162:                 include_tables=False,
163:                 with_metadata=True,
164:                 output_format="json",
165:             )
166: 
167:             if not result:
168:                 msg = f"Failed to extract content from {url}"
169:                 raise ValueError(msg)  # noqa: TRY301
170: 
171:             data = json.loads(result)
172: 
173:             title = data.get("title", "Untitled Article")
174:             content = data.get("text", "")
175:             author = data.get("author")
176:             pub_date = data.get("date")
177: 
178:             if not content:
179:                 msg = f"No content extracted from {url}"
180:                 raise ValueError(msg)  # noqa: TRY301
181: 
182:             # Enforce content size limit
183:             if len(content) > MAX_ARTICLE_SIZE:
184:                 logger.warning(
185:                     "Article content truncated from %d to %d characters",
186:                     len(content),
187:                     MAX_ARTICLE_SIZE,
188:                 )
189:                 content = content[:MAX_ARTICLE_SIZE]
190: 
191:             logger.info(
192:                 "Extracted article: %s (%d chars, author: %s, date: %s)",
193:                 title,
194:                 len(content),
195:                 author or "unknown",
196:                 pub_date or "unknown",
197:             )
198:         except Exception:
199:             logger.exception("Failed to extract content from %s", url)
200:             raise
💭Thought[engineer]3 months ago

Let me continue reading to understand the full structure:

Find^class Test
407 lines
./Omni/Repl.py-212-    def edit(self) -> None:
./Omni/Repl.py-213-        """Edit the current namespace."""
./Omni/Repl.py-214-        edit_file(self.ns, self.path, self.editor)
./Omni/Repl.py-215-
./Omni/Repl.py-216-
./Omni/Repl.py:217:class TestCustomRepl(unittest.TestCase):
./Omni/Repl.py-218-    """Test the CustomRepl functionality."""
./Omni/Repl.py-219-
./Omni/Repl.py-220-    def setUp(self) -> None:
./Omni/Repl.py-221-        """Create a CustomRepl for testing."""
./Omni/Repl.py-222-        ns = __name__
--
./Omni/Llamacpp.py-16-import os
./Omni/Llamacpp.py-17-import sys
./Omni/Llamacpp.py-18-import unittest
./Omni/Llamacpp.py-19-
./Omni/Llamacpp.py-20-
./Omni/Llamacpp.py:21:class TestLlamaCpp(unittest.TestCase):
./Omni/Llamacpp.py-22-    """Test that llama.cpp is available."""
./Omni/Llamacpp.py-23-
./Omni/Llamacpp.py-24-    def test_in_path(self) -> None:
./Omni/Llamacpp.py-25-        """Test that llama.cpp is in $PATH."""
./Omni/Llamacpp.py-26-        self.assertIn("llama-cpp", os.environ.get("PATH", ""))
--
./Omni/Bild/Example.py-41-def hello(name: str) -> str:
./Omni/Bild/Example.py-42-    """Say hello."""
./Omni/Bild/Example.py-43-    return f"Hello {name}"
./Omni/Bild/Example.py-44-
./Omni/Bild/Example.py-45-
./Omni/Bild/Example.py:46:class TestExample(Test.TestCase):
./Omni/Bild/Example.py-47-    """Test the Example module."""
./Omni/Bild/Example.py-48-
./Omni/Bild/Example.py-49-    def test_hello(self) -> None:
./Omni/Bild/Example.py-50-        """Test `hello` function."""
./Omni/Bild/Example.py-51-        self.assertEqual("Hello Ben", hello("Ben"))
--
./Omni/Test.py-7-import unittest
./Omni/Test.py-8-
./Omni/Test.py-9-TestCase = unittest.TestCase
./Omni/Test.py-10-
./Omni/Test.py-11-
./Omni/Test.py:12:class TestError(Exception):
./Omni/Test.py-13-    """When the test environment or harness encounters a problem."""
./Omni/Test.py-14-
./Omni/Test.py-15-
./Omni/Test.py-16-def run(area: App.Area, tests: list[typing.Any]) -> None:
./Omni/Test.py-17-    """Run the given tests with loglevel determined by area."""
--
./Biz/PodcastItLater/Web.py-1917-    def tearDown() -> None:
./Biz/PodcastItLater/Web.py-1918-        """Clean up test database."""
./Biz/PodcastItLater/Web.py-1919-        Core.Database.teardown()
./Biz/PodcastItLater/Web.py-1920-
./Biz/PodcastItLater/Web.py-1921-
./Biz/PodcastItLater/Web.py:1922:class TestDurationFormatting(Test.TestCase):
./Biz/PodcastItLater/Web.py-1923-    """Test duration formatting functionality."""
./Biz/PodcastItLater/Web.py-1924-
./Biz/PodcastItLater/Web.py-1925-    def test_format_duration_minutes_only(self) -> None:
./Biz/PodcastItLater/Web.py-1926-        """Test formatting durations less than an hour."""
./Biz/PodcastItLater/Web.py-1927-        self.assertEqual(UI.format_duration(60), "1m")
--
./Biz/PodcastItLater/Web.py-1948-        self.assertEqual(UI.format_duration(None), "Unknown")
./Biz/PodcastItLater/Web.py-1949-        self.assertEqual(UI.format_duration(0), "Unknown")
./Biz/PodcastItLater/Web.py-1950-        self.assertEqual(UI.format_duration(-100), "Unknown")
./Biz/PodcastItLater/Web.py-1951-
./Biz/PodcastItLater/Web.py-1952-
./Biz/PodcastItLater/Web.py:1953:class TestAuthentication(BaseWebTest):
./Biz/PodcastItLater/Web.py-1954-    """Test authentication functionality."""
./Biz/PodcastItLater/Web.py-1955-
./Biz/PodcastItLater/Web.py-1956-    def test_login_new_user_active(self) -> None:
./Biz/PodcastItLater/Web.py-1957-        """New users should be created with active status."""
./Biz/PodcastItLater/Web.py-1958-        response = self.client.post("/login", data={"email": "new@example.com"})
--
./Biz/PodcastItLater/Web.py-2070-        )
./Biz/PodcastItLater/Web.py-2071-
./Biz/PodcastItLater/Web.py-2072-        self.assertIn("Please login first", response.text)
./Biz/PodcastItLater/Web.py-2073-
./Biz/PodcastItLater/Web.py-2074-
./Biz/PodcastItLater/Web.py:2075:class TestArticleSubmission(BaseWebTest):
./Biz/PodcastItLater/Web.py-2076-    """Test article submission functionality."""
./Biz/PodcastItLater/Web.py-2077-
./Biz/PodcastItLater/Web.py-2078-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2079-        """Set up test client with logged-in user."""
./Biz/PodcastItLater/Web.py-2080-        super().setUp()
--
./Biz/PodcastItLater/Web.py-2146-        # Should return HTML fragment, not full page
./Biz/PodcastItLater/Web.py-2147-        self.assertNotIn("<!DOCTYPE", response.text)
./Biz/PodcastItLater/Web.py-2148-        self.assertIn("<div", response.text)
./Biz/PodcastItLater/Web.py-2149-
./Biz/PodcastItLater/Web.py-2150-
./Biz/PodcastItLater/Web.py:2151:class TestRSSFeed(BaseWebTest):
./Biz/PodcastItLater/Web.py-2152-    """Test RSS feed generation."""
./Biz/PodcastItLater/Web.py-2153-
./Biz/PodcastItLater/Web.py-2154-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2155-        """Set up test client and create test data."""
./Biz/PodcastItLater/Web.py-2156-        super().setUp()
--
./Biz/PodcastItLater/Web.py-2275-        self.assertEqual(response_xml.status_code, 200)
./Biz/PodcastItLater/Web.py-2276-        self.assertEqual(response_rss.status_code, 200)
./Biz/PodcastItLater/Web.py-2277-        self.assertEqual(response_xml.text, response_rss.text)
./Biz/PodcastItLater/Web.py-2278-
./Biz/PodcastItLater/Web.py-2279-
./Biz/PodcastItLater/Web.py:2280:class TestAdminInterface(BaseWebTest):
./Biz/PodcastItLater/Web.py-2281-    """Test admin interface functionality."""
./Biz/PodcastItLater/Web.py-2282-
./Biz/PodcastItLater/Web.py-2283-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2284-        """Set up test client with logged-in user."""
./Biz/PodcastItLater/Web.py-2285-        super().setUp()
--
./Biz/PodcastItLater/Web.py-2386-        # Should show status counts
./Biz/PodcastItLater/Web.py-2387-        self.assertIn("ERROR: 1", response.text)
./Biz/PodcastItLater/Web.py-2388-        self.assertIn("PROCESSING: 1", response.text)
./Biz/PodcastItLater/Web.py-2389-
./Biz/PodcastItLater/Web.py-2390-
./Biz/PodcastItLater/Web.py:2391:class TestMetricsDashboard(BaseWebTest):
./Biz/PodcastItLater/Web.py-2392-    """Test metrics dashboard functionality."""
./Biz/PodcastItLater/Web.py-2393-
./Biz/PodcastItLater/Web.py-2394-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2395-        """Set up test client with logged-in admin user."""
./Biz/PodcastItLater/Web.py-2396-        super().setUp()
--
./Biz/PodcastItLater/Web.py-2535-        self.assertIn("Episode Metrics", response.text)
./Biz/PodcastItLater/Web.py-2536-        # Should show 0 for counts
./Biz/PodcastItLater/Web.py-2537-        self.assertIn("Total Episodes", response.text)
./Biz/PodcastItLater/Web.py-2538-
./Biz/PodcastItLater/Web.py-2539-
./Biz/PodcastItLater/Web.py:2540:class TestJobCancellation(BaseWebTest):
./Biz/PodcastItLater/Web.py-2541-    """Test job cancellation functionality."""
./Biz/PodcastItLater/Web.py-2542-
./Biz/PodcastItLater/Web.py-2543-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2544-        """Set up test client with logged-in user and pending job."""
./Biz/PodcastItLater/Web.py-2545-        super().setUp()
--
./Biz/PodcastItLater/Web.py-2652-            f'hx-post="/queue/{processing_job}/cancel"',
./Biz/PodcastItLater/Web.py-2653-            response.text,
./Biz/PodcastItLater/Web.py-2654-        )
./Biz/PodcastItLater/Web.py-2655-
./Biz/PodcastItLater/Web.py-2656-
./Biz/PodcastItLater/Web.py:2657:class TestEpisodeDetailPage(BaseWebTest):
./Biz/PodcastItLater/Web.py-2658-    """Test episode detail page functionality."""
./Biz/PodcastItLater/Web.py-2659-
./Biz/PodcastItLater/Web.py-2660-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2661-        """Set up test client with user and episode."""
./Biz/PodcastItLater/Web.py-2662-        super().setUp()
--
./Biz/PodcastItLater/Web.py-2753-            response.headers["location"],
./Biz/PodcastItLater/Web.py-2754-            f"/episode/{self.episode_sqid}",
./Biz/PodcastItLater/Web.py-2755-        )
./Biz/PodcastItLater/Web.py-2756-
./Biz/PodcastItLater/Web.py-2757-
./Biz/PodcastItLater/Web.py:2758:class TestPublicFeed(BaseWebTest):
./Biz/PodcastItLater/Web.py-2759-    """Test public feed functionality."""
./Biz/PodcastItLater/Web.py-2760-
./Biz/PodcastItLater/Web.py-2761-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2762-        """Set up test database, client, and create sample episodes."""
./Biz/PodcastItLater/Web.py-2763-        super().setUp()
--
./Biz/PodcastItLater/Web.py-2918-        public_episodes = Core.Database.get_public_episodes()
./Biz/PodcastItLater/Web.py-2919-        episode_ids = [e["id"] for e in public_episodes]
./Biz/PodcastItLater/Web.py-2920-        self.assertIn(user_episode_id, episode_ids)
./Biz/PodcastItLater/Web.py-2921-
./Biz/PodcastItLater/Web.py-2922-
./Biz/PodcastItLater/Web.py:2923:class TestEpisodeDeduplication(BaseWebTest):
./Biz/PodcastItLater/Web.py-2924-    """Test episode deduplication functionality."""
./Biz/PodcastItLater/Web.py-2925-
./Biz/PodcastItLater/Web.py-2926-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-2927-        """Set up test database, client, and create test user."""
./Biz/PodcastItLater/Web.py-2928-        super().setUp()
--
./Biz/PodcastItLater/Web.py-2992-        episode_ids = [e["id"] for e in episodes]
./Biz/PodcastItLater/Web.py-2993-
./Biz/PodcastItLater/Web.py-2994-        self.assertIn(self.episode_id, episode_ids)
./Biz/PodcastItLater/Web.py-2995-
./Biz/PodcastItLater/Web.py-2996-
./Biz/PodcastItLater/Web.py:2997:class TestMetricsTracking(BaseWebTest):
./Biz/PodcastItLater/Web.py-2998-    """Test episode metrics tracking."""
./Biz/PodcastItLater/Web.py-2999-
./Biz/PodcastItLater/Web.py-3000-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-3001-        """Set up test database, client, and create test episode."""
./Biz/PodcastItLater/Web.py-3002-        super().setUp()
--
./Biz/PodcastItLater/Web.py-3072-        metrics = Core.Database.get_episode_metric_events(self.episode_id)
./Biz/PodcastItLater/Web.py-3073-        played_metrics = [m for m in metrics if m["event_type"] == "played"]
./Biz/PodcastItLater/Web.py-3074-        self.assertGreater(len(played_metrics), 0)
./Biz/PodcastItLater/Web.py-3075-
./Biz/PodcastItLater/Web.py-3076-
./Biz/PodcastItLater/Web.py:3077:class TestUsageLimits(BaseWebTest):
./Biz/PodcastItLater/Web.py-3078-    """Test usage tracking and limit enforcement."""
./Biz/PodcastItLater/Web.py-3079-
./Biz/PodcastItLater/Web.py-3080-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-3081-        """Set up test with free tier user."""
./Biz/PodcastItLater/Web.py-3082-        super().setUp()
--
./Biz/PodcastItLater/Web.py-3250-        self.assertTrue(allowed)
./Biz/PodcastItLater/Web.py-3251-        self.assertEqual(msg, "")
./Biz/PodcastItLater/Web.py-3252-        self.assertEqual(usage["articles"], 20)
./Biz/PodcastItLater/Web.py-3253-
./Biz/PodcastItLater/Web.py-3254-
./Biz/PodcastItLater/Web.py:3255:class TestAccountPage(BaseWebTest):
./Biz/PodcastItLater/Web.py-3256-    """Test account page functionality."""
./Biz/PodcastItLater/Web.py-3257-
./Biz/PodcastItLater/Web.py-3258-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-3259-        """Set up test with user."""
./Biz/PodcastItLater/Web.py-3260-        super().setUp()
--
./Biz/PodcastItLater/Web.py-3367-        # Verify session cleared
./Biz/PodcastItLater/Web.py-3368-        response = self.client.get("/account", follow_redirects=False)
./Biz/PodcastItLater/Web.py-3369-        self.assertEqual(response.status_code, 307)
./Biz/PodcastItLater/Web.py-3370-
./Biz/PodcastItLater/Web.py-3371-
./Biz/PodcastItLater/Web.py:3372:class TestAdminUsers(BaseWebTest):
./Biz/PodcastItLater/Web.py-3373-    """Test admin user management functionality."""
./Biz/PodcastItLater/Web.py-3374-
./Biz/PodcastItLater/Web.py-3375-    def setUp(self) -> None:
./Biz/PodcastItLater/Web.py-3376-        """Set up test client with logged-in admin user."""
./Biz/PodcastItLater/Web.py-3377-        super().setUp()
--
./Biz/PodcastItLater/Test.py-47-    def tearDown() -> None:
./Biz/PodcastItLater/Test.py-48-        """Clean up after each test."""
./Biz/PodcastItLater/Test.py-49-        Core.Database.teardown()
./Biz/PodcastItLater/Test.py-50-
./Biz/PodcastItLater/Test.py-51-
./Biz/PodcastItLater/Test.py:52:class TestEndToEnd(BaseWebTest):
./Biz/PodcastItLater/Test.py-53-    """Test complete end-to-end flows."""
./Biz/PodcastItLater/Test.py-54-
./Biz/PodcastItLater/Test.py-55-    def setUp(self) -> None:
./Biz/PodcastItLater/Test.py-56-        """Set up test client with logged-in user."""
./Biz/PodcastItLater/Test.py-57-        super().setUp()
--
./Biz/PodcastItLater/Test.py-208-        response = self.client.get(f"/feed/{token2}.rss")
./Biz/PodcastItLater/Test.py-209-        self.assertNotIn("Great Article Title", response.text)
./Biz/PodcastItLater/Test.py-210-        self.assertIn("Other User's Article", response.text)
./Biz/PodcastItLater/Test.py-211-
./Biz/PodcastItLater/Test.py-212-
./Biz/PodcastItLater/Test.py:213:class TestUI(Test.TestCase):
./Biz/PodcastItLater/Test.py-214-    """Test UI components."""
./Biz/PodcastItLater/Test.py-215-
./Biz/PodcastItLater/Test.py-216-    def test_render_navbar(self) -> None:
./Biz/PodcastItLater/Test.py-217-        """Test navbar rendering."""
./Biz/PodcastItLater/Test.py-218-        user = {"email": "test@example.com", "id": 1}
--
./Biz/PodcastItLater.md-189-# Main code implementation
./Biz/PodcastItLater.md-190-class Database:
./Biz/PodcastItLater.md-191-    ...
./Biz/PodcastItLater.md-192-
./Biz/PodcastItLater.md-193-# Test class next to the class it is testing
./Biz/PodcastItLater.md:194:class TestDatabase(Test.TestCase):
./Biz/PodcastItLater.md-195-    """Test the Database class."""
./Biz/PodcastItLater.md-196-
./Biz/PodcastItLater.md-197-    def test_init_db(self) -> None:
./Biz/PodcastItLater.md-198-        """Verify all tables and indexes are created correctly."""
./Biz/PodcastItLater.md-199-        ...
--
./Biz/PodcastItLater/TestMetricsView.py-29-    def tearDown() -> None:
./Biz/PodcastItLater/TestMetricsView.py-30-        """Clean up test database."""
./Biz/PodcastItLater/TestMetricsView.py-31-        Core.Database.teardown()
./Biz/PodcastItLater/TestMetricsView.py-32-
./Biz/PodcastItLater/TestMetricsView.py-33-
./Biz/PodcastItLater/TestMetricsView.py:34:class TestMetricsView(BaseWebTest):
./Biz/PodcastItLater/TestMetricsView.py-35-    """Test Admin Metrics View."""
./Biz/PodcastItLater/TestMetricsView.py-36-
./Biz/PodcastItLater/TestMetricsView.py-37-    def test_admin_metrics_view_access(self) -> None:
./Biz/PodcastItLater/TestMetricsView.py-38-        """Admin user should be able to access metrics view."""
./Biz/PodcastItLater/TestMetricsView.py-39-        # Create admin user
--
./Biz/PodcastItLater/Worker.py-1047-    except Exception:
./Biz/PodcastItLater/Worker.py-1048-        logger.exception("Worker crashed")
./Biz/PodcastItLater/Worker.py-1049-        raise
./Biz/PodcastItLater/Worker.py-1050-
./Biz/PodcastItLater/Worker.py-1051-
./Biz/PodcastItLater/Worker.py:1052:class TestArticleExtraction(Test.TestCase):
./Biz/PodcastItLater/Worker.py-1053-    """Test article extraction functionality."""
./Biz/PodcastItLater/Worker.py-1054-
./Biz/PodcastItLater/Worker.py-1055-    def test_extract_valid_article(self) -> None:
./Biz/PodcastItLater/Worker.py-1056-        """Extract from well-formed HTML."""
./Biz/PodcastItLater/Worker.py-1057-        # Mock trafilatura.fetch_url and extract
--
./Biz/PodcastItLater/Worker.py-1188-        self.assertEqual(content, "Good content")
./Biz/PodcastItLater/Worker.py-1189-        self.assertNotIn("script", content)
./Biz/PodcastItLater/Worker.py-1190-        self.assertNotIn("table", content)
./Biz/PodcastItLater/Worker.py-1191-
./Biz/PodcastItLater/Worker.py-1192-
./Biz/PodcastItLater/Worker.py:1193:class TestTextToSpeech(Test.TestCase):
./Biz/PodcastItLater/Worker.py-1194-    """Test text-to-speech functionality."""
./Biz/PodcastItLater/Worker.py-1195-
./Biz/PodcastItLater/Worker.py-1196-    def setUp(self) -> None:
./Biz/PodcastItLater/Worker.py-1197-        """Set up mocks."""
./Biz/PodcastItLater/Worker.py-1198-        # Mock OpenAI API key
--
./Biz/PodcastItLater/Worker.py-1617-            audio_data = processor.text_to_speech("Test content", "Test Title")
./Biz/PodcastItLater/Worker.py-1618-
./Biz/PodcastItLater/Worker.py-1619-        self.assertEqual(audio_data, b"ordered-audio")
./Biz/PodcastItLater/Worker.py-1620-
./Biz/PodcastItLater/Worker.py-1621-
./Biz/PodcastItLater/Worker.py:1622:class TestIntroOutro(Test.TestCase):
./Biz/PodcastItLater/Worker.py-1623-    """Test intro and outro generation with metadata."""
./Biz/PodcastItLater/Worker.py-1624-
./Biz/PodcastItLater/Worker.py-1625-    def test_create_intro_text_full_metadata(self) -> None:
./Biz/PodcastItLater/Worker.py-1626-        """Test intro text creation with all metadata."""
./Biz/PodcastItLater/Worker.py-1627-        intro = ArticleProcessor._create_intro_text(  # noqa: SLF001
--
./Biz/PodcastItLater/Worker.py-1703-        self.assertEqual(content, "Article content")
./Biz/PodcastItLater/Worker.py-1704-        self.assertEqual(author, "Test Author")
./Biz/PodcastItLater/Worker.py-1705-        self.assertEqual(pub_date, "2024-01-15")
./Biz/PodcastItLater/Worker.py-1706-
./Biz/PodcastItLater/Worker.py-1707-
./Biz/PodcastItLater/Worker.py:1708:class TestMemoryEfficiency(Test.TestCase):
./Biz/PodcastItLater/Worker.py-1709-    """Test memory-efficient processing."""
./Biz/PodcastItLater/Worker.py-1710-
./Biz/PodcastItLater/Worker.py-1711-    def test_large_article_size_limit(self) -> None:
./Biz/PodcastItLater/Worker.py-1712-        """Test that articles exceeding size limits are rejected."""
./Biz/PodcastItLater/Worker.py-1713-        huge_text = "x" * (MAX_ARTICLE_SIZE + 1000)  # Exceed limit
--
./Biz/PodcastItLater/Worker.py-1756-        self.assertIsInstance(usage, float)
./Biz/PodcastItLater/Worker.py-1757-        self.assertGreaterEqual(usage, 0.0)
./Biz/PodcastItLater/Worker.py-1758-        self.assertLessEqual(usage, 100.0)
./Biz/PodcastItLater/Worker.py-1759-
./Biz/PodcastItLater/Worker.py-1760-
./Biz/PodcastItLater/Worker.py:1761:class TestJobProcessing(Test.TestCase):
./Biz/PodcastItLater/Worker.py-1762-    """Test job processing functionality."""
./Biz/PodcastItLater/Worker.py-1763-
./Biz/PodcastItLater/Worker.py-1764-    def setUp(self) -> None:
./Biz/PodcastItLater/Worker.py-1765-        """Set up test environment."""
./Biz/PodcastItLater/Worker.py-1766-        Core.Database.init_db()
--
./Biz/PodcastItLater/Worker.py-2063-
./Biz/PodcastItLater/Worker.py-2064-        # Job should not be processed (no status updates)
./Biz/PodcastItLater/Worker.py-2065-        mock_update.assert_not_called()
./Biz/PodcastItLater/Worker.py-2066-
./Biz/PodcastItLater/Worker.py-2067-
./Biz/PodcastItLater/Worker.py:2068:class TestWorkerErrorHandling(Test.TestCase):
./Biz/PodcastItLater/Worker.py-2069-    """Test worker error handling and recovery."""
./Biz/PodcastItLater/Worker.py-2070-
./Biz/PodcastItLater/Worker.py-2071-    def setUp(self) -> None:
./Biz/PodcastItLater/Worker.py-2072-        """Set up test environment."""
./Biz/PodcastItLater/Worker.py-2073-        Core.Database.init_db()
--
./Biz/PodcastItLater/Worker.py-2151-        self.assertIsNotNone(job)
./Biz/PodcastItLater/Worker.py-2152-        if job:
./Biz/PodcastItLater/Worker.py-2153-            self.assertEqual(job["status"], "error")
./Biz/PodcastItLater/Worker.py-2154-
./Biz/PodcastItLater/Worker.py-2155-
./Biz/PodcastItLater/Worker.py:2156:class TestTextChunking(Test.TestCase):
./Biz/PodcastItLater/Worker.py-2157-    """Test text chunking edge cases."""
./Biz/PodcastItLater/Worker.py-2158-
./Biz/PodcastItLater/Worker.py-2159-    def test_split_text_single_long_word(self) -> None:
./Biz/PodcastItLater/Worker.py-2160-        """Handle text with a single word exceeding limit."""
./Biz/PodcastItLater/Worker.py-2161-        long_word = "a" * 4000
--
./Biz/PodcastItLater/Billing.py-472-
./Biz/PodcastItLater/Billing.py-473-# Tests
./Biz/PodcastItLater/Billing.py-474-# ruff: noqa: PLR6301, PLW0603, S101
./Biz/PodcastItLater/Billing.py-475-
./Biz/PodcastItLater/Billing.py-476-
./Biz/PodcastItLater/Billing.py:477:class TestWebhookHandling(Test.TestCase):
./Biz/PodcastItLater/Billing.py-478-    """Test Stripe webhook handling."""
./Biz/PodcastItLater/Billing.py-479-
./Biz/PodcastItLater/Billing.py-480-    def setUp(self) -> None:
./Biz/PodcastItLater/Billing.py-481-        """Set up test database."""
./Biz/PodcastItLater/Billing.py-482-        Core.Database.init_db()
--
./Biz/PodcastItLater/Core.py-1544-            minutes = total_seconds // 60
./Biz/PodcastItLater/Core.py-1545-
./Biz/PodcastItLater/Core.py-1546-            return {"articles": articles, "minutes": minutes}
./Biz/PodcastItLater/Core.py-1547-
./Biz/PodcastItLater/Core.py-1548-
./Biz/PodcastItLater/Core.py:1549:class TestDatabase(Test.TestCase):
./Biz/PodcastItLater/Core.py-1550-    """Test the Database class."""
./Biz/PodcastItLater/Core.py-1551-
./Biz/PodcastItLater/Core.py-1552-    @staticmethod
./Biz/PodcastItLater/Core.py-1553-    def setUp() -> None:
./Biz/PodcastItLater/Core.py-1554-        """Set up test database."""
--
./Biz/PodcastItLater/Core.py-1638-        self.assertEqual(metrics["total_users"], 1)
./Biz/PodcastItLater/Core.py-1639-        self.assertEqual(metrics["submissions_24h"], 1)
./Biz/PodcastItLater/Core.py-1640-        self.assertEqual(metrics["submissions_7d"], 1)
./Biz/PodcastItLater/Core.py-1641-
./Biz/PodcastItLater/Core.py-1642-
./Biz/PodcastItLater/Core.py:1643:class TestUserManagement(Test.TestCase):
./Biz/PodcastItLater/Core.py-1644-    """Test user management functionality."""
./Biz/PodcastItLater/Core.py-1645-
./Biz/PodcastItLater/Core.py-1646-    @staticmethod
./Biz/PodcastItLater/Core.py-1647-    def setUp() -> None:
./Biz/PodcastItLater/Core.py-1648-        """Set up test database."""
--
./Biz/PodcastItLater/Core.py-1795-        # Try to update user1 to user2's email
./Biz/PodcastItLater/Core.py-1796-        with pytest.raises(ValueError, match="already taken"):
./Biz/PodcastItLater/Core.py-1797-            Database.update_user_email(user_id1, "user2@example.com")
./Biz/PodcastItLater/Core.py-1798-
./Biz/PodcastItLater/Core.py-1799-
./Biz/PodcastItLater/Core.py:1800:class TestQueueOperations(Test.TestCase):
./Biz/PodcastItLater/Core.py-1801-    """Test queue operations."""
./Biz/PodcastItLater/Core.py-1802-
./Biz/PodcastItLater/Core.py-1803-    def setUp(self) -> None:
./Biz/PodcastItLater/Core.py-1804-        """Set up test database with a user."""
./Biz/PodcastItLater/Core.py-1805-        Database.init_db()
--
./Biz/PodcastItLater/Core.py-2041-        self.assertEqual(Database.get_queue_position(job1), 1)
./Biz/PodcastItLater/Core.py-2042-        self.assertIsNone(Database.get_queue_position(job2))
./Biz/PodcastItLater/Core.py-2043-        self.assertEqual(Database.get_queue_position(job3), 2)
./Biz/PodcastItLater/Core.py-2044-
./Biz/PodcastItLater/Core.py-2045-
./Biz/PodcastItLater/Core.py:2046:class TestEpisodeManagement(Test.TestCase):
./Biz/PodcastItLater/Core.py-2047-    """Test episode management functionality."""
./Biz/PodcastItLater/Core.py-2048-
./Biz/PodcastItLater/Core.py-2049-    def setUp(self) -> None:
./Biz/PodcastItLater/Core.py-2050-        """Set up test database with a user."""
./Biz/PodcastItLater/Core.py-2051-        Database.init_db()
💭Thought[engineer]3 months ago

Now let me check the Worker.py file to see the test classes and understand where they are:

Grep^class Test in Biz/PodcastItLater/Worker.py
1052:class TestArticleExtraction(Test.TestCase):
1193:class TestTextToSpeech(Test.TestCase):
1622:class TestIntroOutro(Test.TestCase):
1708:class TestMemoryEfficiency(Test.TestCase):
1761:class TestJobProcessing(Test.TestCase):
2068:class TestWorkerErrorHandling(Test.TestCase):
2156:class TestTextChunking(Test.TestCase):
💭Thought[engineer]3 months ago

Let me get a full overview of the file structure:

Grep^(class |def |# ) in Biz/PodcastItLater/Worker.py
35 lines
3:# : dep boto3
4:# : dep botocore
5:# : dep openai
6:# : dep psutil
7:# : dep pydub
8:# : dep pytest
9:# : dep pytest-asyncio
10:# : dep pytest-mock
11:# : dep trafilatura
12:# : out podcastitlater-worker
13:# : run ffmpeg
47:# Configuration from environment variables
55:# Worker configuration
67:class ShutdownHandler:
103:class ArticleProcessor:
721:def prepare_text_for_tts(text: str, title: str) -> list[str]:
761:def split_text_into_chunks(text: str, max_chars: int = 3000) -> list[str]:
799:def edit_chunk_for_speech(
874:def parse_datetime_with_timezone(created_at: str | datetime) -> datetime:
890:def should_retry_job(job: dict[str, Any], max_retries: int) -> bool:
907:def process_pending_jobs(
939:def process_retryable_jobs() -> None:
958:def check_memory_usage() -> int | Any:
969:def cleanup_stale_jobs() -> None:
990:def main_loop() -> None:
1036:def move() -> None:
1052:class TestArticleExtraction(Test.TestCase):
1193:class TestTextToSpeech(Test.TestCase):
1622:class TestIntroOutro(Test.TestCase):
1708:class TestMemoryEfficiency(Test.TestCase):
1761:class TestJobProcessing(Test.TestCase):
2068:class TestWorkerErrorHandling(Test.TestCase):
2156:class TestTextChunking(Test.TestCase):
2179:def test() -> None:
2194:def main() -> None:
💭Thought[engineer]3 months ago

Let me check the total lines and get a better picture:

ϟwc -l Biz/PodcastItLater/Worker.py
💭Thought[engineer]3 months ago

Good. Now let me read the key sections to understand the structure better:

ReadBiz/PodcastItLater/Worker.py @103-140
38 lines
103: class ArticleProcessor:
104:     """Handles the complete article-to-podcast conversion pipeline."""
105: 
106:     def __init__(self, shutdown_handler: ShutdownHandler) -> None:
107:         """Initialize the processor with required services.
108: 
109:         Raises:
110:             ValueError: If OPENAI_API_KEY environment variable is not set.
111:         """
112:         if not OPENAI_API_KEY:
113:             msg = "OPENAI_API_KEY environment variable is required"
114:             raise ValueError(msg)
115: 
116:         self.openai_client: openai.OpenAI = openai.OpenAI(
117:             api_key=OPENAI_API_KEY,
118:         )
119:         self.shutdown_handler = shutdown_handler
120: 
121:         # Initialize S3 client for Digital Ocean Spaces
122:         if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]):
123:             self.s3_client: Any = boto3.client(
124:                 "s3",
125:                 endpoint_url=S3_ENDPOINT,
126:                 aws_access_key_id=S3_ACCESS_KEY,
127:                 aws_secret_access_key=S3_SECRET_KEY,
128:             )
129:         else:
130:             logger.warning("S3 configuration incomplete, uploads will fail")
131:             self.s3_client = None
132: 
133:     @staticmethod
134:     def extract_article_content(
135:         url: str,
136:     ) -> tuple[str, str, str | None, str | None]:
137:         """Extract title, content, author, and date from article URL.
138: 
139:         Returns:
140:             tuple: (title, content, author, publication_date)
ReadBiz/PodcastItLater/Worker.py @715-730
16 lines
715:             raise
716:         finally:
717:             # Clear current job
718:             self.shutdown_handler.set_current_job(None)
719: 
720: 
721: def prepare_text_for_tts(text: str, title: str) -> list[str]:
722:     """Use LLM to prepare text for TTS, returning chunks ready for speech.
723: 
724:     First splits text mechanically, then has LLM edit each chunk.
725:     """
726:     # First, split the text into manageable chunks
727:     raw_chunks = split_text_into_chunks(text, max_chars=3000)
728: 
729:     logger.info("Split article into %d raw chunks", len(raw_chunks))
730: 
💭Thought[engineer]3 months ago

Let me read the functions that need to be moved:

ReadBiz/PodcastItLater/Worker.py @721-873
153 lines
721: def prepare_text_for_tts(text: str, title: str) -> list[str]:
722:     """Use LLM to prepare text for TTS, returning chunks ready for speech.
723: 
724:     First splits text mechanically, then has LLM edit each chunk.
725:     """
726:     # First, split the text into manageable chunks
727:     raw_chunks = split_text_into_chunks(text, max_chars=3000)
728: 
729:     logger.info("Split article into %d raw chunks", len(raw_chunks))
730: 
731:     # Prepare the first chunk with intro
732:     edited_chunks = []
733: 
734:     for i, chunk in enumerate(raw_chunks):
735:         is_first = i == 0
736:         is_last = i == len(raw_chunks) - 1
737: 
738:         try:
739:             edited_chunk = edit_chunk_for_speech(
740:                 chunk,
741:                 title=title if is_first else None,
742:                 is_first=is_first,
743:                 is_last=is_last,
744:             )
745:             edited_chunks.append(edited_chunk)
746:         except Exception:
747:             logger.exception("Failed to edit chunk %d", i + 1)
748:             # Fall back to raw chunk if LLM fails
749:             if is_first:
750:                 edited_chunks.append(
751:                     f"This is an audio version of {title}. {chunk}",
752:                 )
753:             elif is_last:
754:                 edited_chunks.append(f"{chunk} This concludes the article.")
755:             else:
756:                 edited_chunks.append(chunk)
757: 
758:     return edited_chunks
759: 
760: 
761: def split_text_into_chunks(text: str, max_chars: int = 3000) -> list[str]:
762:     """Split text into chunks at sentence boundaries."""
763:     chunks = []
764:     current_chunk = ""
765: 
766:     # Split into paragraphs first
767:     paragraphs = text.split("\n\n")
768: 
769:     for para in paragraphs:
770:         para_stripped = para.strip()
771:         if not para_stripped:
772:             continue
773: 
774:         # If paragraph itself is too long, split by sentences
775:         if len(para_stripped) > max_chars:
776:             sentences = para_stripped.split(". ")
777:             for sentence in sentences:
778:                 if len(current_chunk) + len(sentence) + 2 < max_chars:
779:                     current_chunk += sentence + ". "
780:                 else:
781:                     if current_chunk:
782:                         chunks.append(current_chunk.strip())
783:                     current_chunk = sentence + ". "
784:         # If adding this paragraph would exceed limit, start new chunk
785:         elif len(current_chunk) + len(para_stripped) + 2 > max_chars:
786:             if current_chunk:
787:                 chunks.append(current_chunk.strip())
788:             current_chunk = para_stripped + " "
789:         else:
790:             current_chunk += para_stripped + " "
791: 
792:     # Don't forget the last chunk
793:     if current_chunk:
794:         chunks.append(current_chunk.strip())
795: 
796:     return chunks
797: 
798: 
799: def edit_chunk_for_speech(
800:     chunk: str,
801:     title: str | None = None,
802:     *,
803:     is_first: bool = False,
804:     is_last: bool = False,
805: ) -> str:
806:     """Use LLM to lightly edit a single chunk for speech.
807: 
808:     Raises:
809:         ValueError: If no content is returned from LLM.
810:     """
811:     system_prompt = (
812:         "You are a podcast script editor. Your job is to lightly edit text "
813:         "to make it sound natural when spoken aloud.\n\n"
814:         "Guidelines:\n"
815:     )
816:     system_prompt += """
817: - Remove URLs and email addresses, replacing with descriptive phrases
818: - Convert bullet points and lists into flowing sentences
819: - Fix any awkward phrasing for speech
820: - Remove references like "click here" or "see below"
821: - Keep edits minimal - preserve the original content and style
822: - Do NOT add commentary or explanations
823: - Return ONLY the edited text, no JSON or formatting
824: """
825: 
826:     user_prompt = chunk
827: 
828:     # Add intro/outro if needed
829:     if is_first and title:
830:         user_prompt = (
831:             f"Add a brief intro mentioning this is an audio version of "
832:             f"'{title}', then edit this text:\n\n{chunk}"
833:         )
834:     elif is_last:
835:         user_prompt = f"Edit this text and add a brief closing:\n\n{chunk}"
836: 
837:     try:
838:         client: openai.OpenAI = openai.OpenAI(api_key=OPENAI_API_KEY)
839:         response = client.chat.completions.create(
840:             model="gpt-4o-mini",
841:             messages=[
842:                 {"role": "system", "content": system_prompt},
843:                 {"role": "user", "content": user_prompt},
844:             ],
845:             temperature=0.3,  # Lower temperature for more consistent edits
846:             max_tokens=4000,
847:         )
848: 
849:         content = response.choices[0].message.content
850:         if not content:
851:             msg = "No content returned from LLM"
852:             raise ValueError(msg)  # noqa: TRY301
853: 
854:         # Ensure the chunk isn't too long
855:         max_chunk_length = 4000
856:         if len(content) > max_chunk_length:
857:             # Truncate at sentence boundary
858:             sentences = content.split(". ")
859:             truncated = ""
860:             for sentence in sentences:
861:                 if len(truncated) + len(sentence) + 2 < max_chunk_length:
862:                     truncated += sentence + ". "
863:                 else:
864:                     break
865:             content = truncated.strip()
866: 
867:     except Exception:
868:         logger.exception("LLM chunk editing failed")
869:         raise
870:     else:
871:         return content
872: 
873: 
ReadBiz/PodcastItLater/Worker.py @874-1035
162 lines
874: def parse_datetime_with_timezone(created_at: str | datetime) -> datetime:
875:     """Parse datetime string and ensure it has timezone info."""
876:     if isinstance(created_at, str):
877:         # Handle timezone-aware datetime strings
878:         if created_at.endswith("Z"):
879:             created_at = created_at[:-1] + "+00:00"
880:         last_attempt = datetime.fromisoformat(created_at)
881:         if last_attempt.tzinfo is None:
882:             last_attempt = last_attempt.replace(tzinfo=timezone.utc)
883:     else:
884:         last_attempt = created_at
885:         if last_attempt.tzinfo is None:
886:             last_attempt = last_attempt.replace(tzinfo=timezone.utc)
887:     return last_attempt
888: 
889: 
890: def should_retry_job(job: dict[str, Any], max_retries: int) -> bool:
891:     """Check if a job should be retried based on retry count and backoff time.
892: 
893:     Uses exponential backoff to determine if enough time has passed.
894:     """
895:     retry_count = job["retry_count"]
896:     if retry_count >= max_retries:
897:         return False
898: 
899:     # Exponential backoff: 30s, 60s, 120s
900:     backoff_time = 30 * (2**retry_count)
901:     last_attempt = parse_datetime_with_timezone(job["created_at"])
902:     time_since_attempt = datetime.now(tz=timezone.utc) - last_attempt
903: 
904:     return time_since_attempt > timedelta(seconds=backoff_time)
905: 
906: 
907: def process_pending_jobs(
908:     processor: ArticleProcessor,
909: ) -> None:
910:     """Process all pending jobs."""
911:     pending_jobs = Core.Database.get_pending_jobs(
912:         limit=5,
913:     )
914: 
915:     for job in pending_jobs:
916:         if processor.shutdown_handler.is_shutdown_requested():
917:             logger.info("Shutdown requested, stopping job processing")
918:             break
919: 
920:         current_job = job["id"]
921:         try:
922:             processor.process_job(job)
923:         except Exception as e:
924:             # Ensure job is marked as error even if process_job didn't handle it
925:             logger.exception("Failed to process job: %d", current_job)
926:             # Check if job is still in processing state
927:             current_status = Core.Database.get_job_by_id(
928:                 current_job,
929:             )
930:             if current_status and current_status.get("status") == "processing":
931:                 Core.Database.update_job_status(
932:                     current_job,
933:                     "error",
934:                     str(e),
935:                 )
936:             continue
937: 
938: 
939: def process_retryable_jobs() -> None:
940:     """Check and retry failed jobs with exponential backoff."""
941:     retryable_jobs = Core.Database.get_retryable_jobs(
942:         MAX_RETRIES,
943:     )
944: 
945:     for job in retryable_jobs:
946:         if should_retry_job(job, MAX_RETRIES):
947:             logger.info(
948:                 "Retrying job %d (attempt %d)",
949:                 job["id"],
950:                 job["retry_count"] + 1,
951:             )
952:             Core.Database.update_job_status(
953:                 job["id"],
954:                 "pending",
955:             )
956: 
957: 
958: def check_memory_usage() -> int | Any:
959:     """Check current memory usage percentage."""
960:     try:
961:         process = psutil.Process()
962:         # this returns an int but psutil is untyped
963:         return process.memory_percent()
964:     except (psutil.Error, OSError):
965:         logger.warning("Failed to check memory usage")
966:         return 0.0
967: 
968: 
969: def cleanup_stale_jobs() -> None:
970:     """Reset jobs stuck in processing state on startup."""
971:     with Core.Database.get_connection() as conn:
972:         cursor = conn.cursor()
973:         cursor.execute(
974:             """
975:             UPDATE queue
976:             SET status = 'pending'
977:             WHERE status = 'processing'
978:             """,
979:         )
980:         affected = cursor.rowcount
981:         conn.commit()
982: 
983:         if affected > 0:
984:             logger.info(
985:                 "Reset %d stale jobs from processing to pending",
986:                 affected,
987:             )
988: 
989: 
990: def main_loop() -> None:
991:     """Poll for jobs and process them in a continuous loop."""
992:     shutdown_handler = ShutdownHandler()
993:     processor = ArticleProcessor(shutdown_handler)
994: 
995:     # Clean up any stale jobs from previous runs
996:     cleanup_stale_jobs()
997: 
998:     logger.info("Worker started, polling for jobs...")
999: 
1000:     while not shutdown_handler.is_shutdown_requested():
1001:         try:
1002:             # Process pending jobs
1003:             process_pending_jobs(processor)
1004:             process_retryable_jobs()
1005: 
1006:             # Check if there's any work
1007:             pending_jobs = Core.Database.get_pending_jobs(
1008:                 limit=1,
1009:             )
1010:             retryable_jobs = Core.Database.get_retryable_jobs(
1011:                 MAX_RETRIES,
1012:             )
1013: 
1014:             if not pending_jobs and not retryable_jobs:
1015:                 logger.debug("No jobs to process, sleeping...")
1016: 
1017:         except Exception:
1018:             logger.exception("Error in main loop")
1019: 
1020:         # Use interruptible sleep
1021:         if not shutdown_handler.is_shutdown_requested():
1022:             shutdown_handler.shutdown_requested.wait(timeout=POLL_INTERVAL)
1023: 
1024:     # Graceful shutdown
1025:     current_job = shutdown_handler.get_current_job()
1026:     if current_job:
1027:         logger.info(
1028:             "Waiting for job %d to complete before shutdown...",
1029:             current_job,
1030:         )
1031:         # The job will complete or be reset to pending
1032: 
1033:     logger.info("Worker shutdown complete")
1034: 
1035: 
ReadBiz/PodcastItLater/Worker.py @1036-1060
25 lines
1036: def move() -> None:
1037:     """Make the worker move."""
1038:     try:
1039:         # Initialize database
1040:         Core.Database.init_db()
1041: 
1042:         # Start main processing loop
1043:         main_loop()
1044: 
1045:     except KeyboardInterrupt:
1046:         logger.info("Worker stopped by user")
1047:     except Exception:
1048:         logger.exception("Worker crashed")
1049:         raise
1050: 
1051: 
1052: class TestArticleExtraction(Test.TestCase):
1053:     """Test article extraction functionality."""
1054: 
1055:     def test_extract_valid_article(self) -> None:
1056:         """Extract from well-formed HTML."""
1057:         # Mock trafilatura.fetch_url and extract
1058:         mock_html = (
1059:             "<html><body><h1>Test Article</h1><p>Content here</p></body></html>"
1060:         )
ReadBiz/PodcastItLater/Worker.py @2179-2199
21 lines
2179: def test() -> None:
2180:     """Run the tests."""
2181:     Test.run(
2182:         App.Area.Test,
2183:         [
2184:             TestArticleExtraction,
2185:             TestTextToSpeech,
2186:             TestMemoryEfficiency,
2187:             TestJobProcessing,
2188:             TestWorkerErrorHandling,
2189:             TestTextChunking,
2190:         ],
2191:     )
2192: 
2193: 
2194: def main() -> None:
2195:     """Entry point for the worker."""
2196:     if "test" in sys.argv:
2197:         test()
2198:     else:
2199:         move()
💭Thought[engineer]3 months ago

Now let me check where ArticleProcessor class ends:

ReadBiz/PodcastItLater/Worker.py @700-725
26 lines
700: 
701:             logger.info(
702:                 "Successfully processed job %d -> episode %d",
703:                 job_id,
704:                 episode_id,
705:             )
706: 
707:         except Exception as e:
708:             error_msg = str(e)
709:             logger.exception("Job %d failed: %s", job_id, error_msg)
710:             Core.Database.update_job_status(
711:                 job_id,
712:                 "error",
713:                 error_msg,
714:             )
715:             raise
716:         finally:
717:             # Clear current job
718:             self.shutdown_handler.set_current_job(None)
719: 
720: 
721: def prepare_text_for_tts(text: str, title: str) -> list[str]:
722:     """Use LLM to prepare text for TTS, returning chunks ready for speech.
723: 
724:     First splits text mechanically, then has LLM edit each chunk.
725:     """
💭Thought[engineer]3 months ago

Now I have enough context. Let me check the test classes to understand which tests go with which functionality:

ReadBiz/PodcastItLater/Worker.py @1052-1195
144 lines
1052: class TestArticleExtraction(Test.TestCase):
1053:     """Test article extraction functionality."""
1054: 
1055:     def test_extract_valid_article(self) -> None:
1056:         """Extract from well-formed HTML."""
1057:         # Mock trafilatura.fetch_url and extract
1058:         mock_html = (
1059:             "<html><body><h1>Test Article</h1><p>Content here</p></body></html>"
1060:         )
1061:         mock_result = json.dumps({
1062:             "title": "Test Article",
1063:             "text": "Content here",
1064:         })
1065: 
1066:         with (
1067:             unittest.mock.patch(
1068:                 "trafilatura.fetch_url",
1069:                 return_value=mock_html,
1070:             ),
1071:             unittest.mock.patch(
1072:                 "trafilatura.extract",
1073:                 return_value=mock_result,
1074:             ),
1075:         ):
1076:             title, content, author, pub_date = (
1077:                 ArticleProcessor.extract_article_content(
1078:                     "https://example.com",
1079:                 )
1080:             )
1081: 
1082:         self.assertEqual(title, "Test Article")
1083:         self.assertEqual(content, "Content here")
1084:         self.assertIsNone(author)
1085:         self.assertIsNone(pub_date)
1086: 
1087:     def test_extract_missing_title(self) -> None:
1088:         """Handle articles without titles."""
1089:         mock_html = "<html><body><p>Content without title</p></body></html>"
1090:         mock_result = json.dumps({"text": "Content without title"})
1091: 
1092:         with (
1093:             unittest.mock.patch(
1094:                 "trafilatura.fetch_url",
1095:                 return_value=mock_html,
1096:             ),
1097:             unittest.mock.patch(
1098:                 "trafilatura.extract",
1099:                 return_value=mock_result,
1100:             ),
1101:         ):
1102:             title, content, author, pub_date = (
1103:                 ArticleProcessor.extract_article_content(
1104:                     "https://example.com",
1105:                 )
1106:             )
1107: 
1108:         self.assertEqual(title, "Untitled Article")
1109:         self.assertEqual(content, "Content without title")
1110:         self.assertIsNone(author)
1111:         self.assertIsNone(pub_date)
1112: 
1113:     def test_extract_empty_content(self) -> None:
1114:         """Handle empty articles."""
1115:         mock_html = "<html><body></body></html>"
1116:         mock_result = json.dumps({"title": "Empty Article", "text": ""})
1117: 
1118:         with (
1119:             unittest.mock.patch(
1120:                 "trafilatura.fetch_url",
1121:                 return_value=mock_html,
1122:             ),
1123:             unittest.mock.patch(
1124:                 "trafilatura.extract",
1125:                 return_value=mock_result,
1126:             ),
1127:             pytest.raises(ValueError, match="No content extracted") as cm,
1128:         ):
1129:             ArticleProcessor.extract_article_content(
1130:                 "https://example.com",
1131:             )
1132: 
1133:         self.assertIn("No content extracted", str(cm.value))
1134: 
1135:     def test_extract_network_error(self) -> None:
1136:         """Handle connection failures."""
1137:         with (
1138:             unittest.mock.patch("trafilatura.fetch_url", return_value=None),
1139:             pytest.raises(ValueError, match="Failed to download") as cm,
1140:         ):
1141:             ArticleProcessor.extract_article_content("https://example.com")
1142: 
1143:         self.assertIn("Failed to download", str(cm.value))
1144: 
1145:     @staticmethod
1146:     def test_extract_timeout() -> None:
1147:         """Handle slow responses."""
1148:         with (
1149:             unittest.mock.patch(
1150:                 "trafilatura.fetch_url",
1151:                 side_effect=TimeoutError("Timeout"),
1152:             ),
1153:             pytest.raises(TimeoutError),
1154:         ):
1155:             ArticleProcessor.extract_article_content("https://example.com")
1156: 
1157:     def test_content_sanitization(self) -> None:
1158:         """Remove unwanted elements."""
1159:         mock_html = """
1160:         <html><body>
1161:             <h1>Article</h1>
1162:             <p>Good content</p>
1163:             <script>alert('bad')</script>
1164:             <table><tr><td>data</td></tr></table>
1165:         </body></html>
1166:         """
1167:         mock_result = json.dumps({
1168:             "title": "Article",
1169:             "text": "Good content",  # Tables and scripts removed
1170:         })
1171: 
1172:         with (
1173:             unittest.mock.patch(
1174:                 "trafilatura.fetch_url",
1175:                 return_value=mock_html,
1176:             ),
1177:             unittest.mock.patch(
1178:                 "trafilatura.extract",
1179:                 return_value=mock_result,
1180:             ),
1181:         ):
1182:             _title, content, _author, _pub_date = (
1183:                 ArticleProcessor.extract_article_content(
1184:                     "https://example.com",
1185:                 )
1186:             )
1187: 
1188:         self.assertEqual(content, "Good content")
1189:         self.assertNotIn("script", content)
1190:         self.assertNotIn("table", content)
1191: 
1192: 
1193: class TestTextToSpeech(Test.TestCase):
1194:     """Test text-to-speech functionality."""
1195: 
💭Thought[engineer]3 months ago

Let me check line count for the tests to understand better what to move:

ϟgrep -n "^class Test" Biz/PodcastItLater/Worker.py
1052:class TestArticleExtraction(Test.TestCase):
1193:class TestTextToSpeech(Test.TestCase):
1622:class TestIntroOutro(Test.TestCase):
1708:class TestMemoryEfficiency(Test.TestCase):
1761:class TestJobProcessing(Test.TestCase):
2068:class TestWorkerErrorHandling(Test.TestCase):
2156:class TestTextChunking(Test.TestCase):
💭Thought[engineer]3 months ago

Now let me understand the plan from the task:

Based on the task description, I need to: 1. Create Worker/Processor.py with ArticleProcessor class + TestArticleExtraction, TestTextToSpeech, TestIntroOutro 2. Create Worker/TextProcessing.py with text processing functions + TestTextChunking 3. Create Worker/Jobs.py with job management functions + TestJobProcessing 4. Keep in Worker.py: imports, config, ShutdownHandler, move(), test(), main(), TestMemoryEfficiency, TestWorkerErrorHandling

Let me first create the directory structure:

ϟmkdir -p Biz/PodcastItLater/Worker
💭Thought[engineer]3 months ago

Now let me read the ArticleProcessor class completely to extract it properly:

ReadBiz/PodcastItLater/Worker.py @103-720
618 lines
103: class ArticleProcessor:
104:     """Handles the complete article-to-podcast conversion pipeline."""
105: 
106:     def __init__(self, shutdown_handler: ShutdownHandler) -> None:
107:         """Initialize the processor with required services.
108: 
109:         Raises:
110:             ValueError: If OPENAI_API_KEY environment variable is not set.
111:         """
112:         if not OPENAI_API_KEY:
113:             msg = "OPENAI_API_KEY environment variable is required"
114:             raise ValueError(msg)
115: 
116:         self.openai_client: openai.OpenAI = openai.OpenAI(
117:             api_key=OPENAI_API_KEY,
118:         )
119:         self.shutdown_handler = shutdown_handler
120: 
121:         # Initialize S3 client for Digital Ocean Spaces
122:         if all([S3_ENDPOINT, S3_BUCKET, S3_ACCESS_KEY, S3_SECRET_KEY]):
123:             self.s3_client: Any = boto3.client(
124:                 "s3",
125:                 endpoint_url=S3_ENDPOINT,
126:                 aws_access_key_id=S3_ACCESS_KEY,
127:                 aws_secret_access_key=S3_SECRET_KEY,
128:             )
129:         else:
130:             logger.warning("S3 configuration incomplete, uploads will fail")
131:             self.s3_client = None
132: 
133:     @staticmethod
134:     def extract_article_content(
135:         url: str,
136:     ) -> tuple[str, str, str | None, str | None]:
137:         """Extract title, content, author, and date from article URL.
138: 
139:         Returns:
140:             tuple: (title, content, author, publication_date)
141: 
142:         Raises:
143:             ValueError: If content cannot be downloaded, extracted, or large.
144:         """
145:         try:
146:             downloaded = trafilatura.fetch_url(url)
147:             if not downloaded:
148:                 msg = f"Failed to download content from {url}"
149:                 raise ValueError(msg)  # noqa: TRY301
150: 
151:             # Check size before processing
152:             if (
153:                 len(downloaded) > MAX_ARTICLE_SIZE * 4
154:             ):  # Rough HTML to text ratio
155:                 msg = f"Article too large: {len(downloaded)} bytes"
156:                 raise ValueError(msg)  # noqa: TRY301
157: 
158:             # Extract with metadata
159:             result = trafilatura.extract(
160:                 downloaded,
161:                 include_comments=False,
162:                 include_tables=False,
163:                 with_metadata=True,
164:                 output_format="json",
165:             )
166: 
167:             if not result:
168:                 msg = f"Failed to extract content from {url}"
169:                 raise ValueError(msg)  # noqa: TRY301
170: 
171:             data = json.loads(result)
172: 
173:             title = data.get("title", "Untitled Article")
174:             content = data.get("text", "")
175:             author = data.get("author")
176:             pub_date = data.get("date")
177: 
178:             if not content:
179:                 msg = f"No content extracted from {url}"
180:                 raise ValueError(msg)  # noqa: TRY301
181: 
182:             # Enforce content size limit
183:             if len(content) > MAX_ARTICLE_SIZE:
184:                 logger.warning(
185:                     "Article content truncated from %d to %d characters",
186:                     len(content),
187:                     MAX_ARTICLE_SIZE,
188:                 )
189:                 content = content[:MAX_ARTICLE_SIZE]
190: 
191:             logger.info(
192:                 "Extracted article: %s (%d chars, author: %s, date: %s)",
193:                 title,
194:                 len(content),
195:                 author or "unknown",
196:                 pub_date or "unknown",
197:             )
198:         except Exception:
199:             logger.exception("Failed to extract content from %s", url)
200:             raise
201:         else:
202:             return title, content, author, pub_date
203: 
204:     def text_to_speech(
205:         self,
206:         text: str,
207:         title: str,
208:         author: str | None = None,
209:         pub_date: str | None = None,
210:     ) -> bytes:
211:         """Convert text to speech with intro/outro using OpenAI TTS API.
212: 
213:         Uses parallel processing for chunks while maintaining order.
214:         Adds intro with metadata and outro with attribution.
215: 
216:         Args:
217:             text: Article content to convert
218:             title: Article title
219:             author: Article author (optional)
220:             pub_date: Publication date (optional)
221: 
222:         Raises:
223:             ValueError: If no chunks are generated from text.
224:         """
225:         try:
226:             # Generate intro audio
227:             intro_text = self._create_intro_text(title, author, pub_date)
228:             intro_audio = self._generate_tts_segment(intro_text)
229: 
230:             # Generate outro audio
231:             outro_text = self._create_outro_text(title, author)
232:             outro_audio = self._generate_tts_segment(outro_text)
233: 
234:             # Use LLM to prepare and chunk the main content
235:             chunks = prepare_text_for_tts(text, title)
236: 
237:             if not chunks:
238:                 msg = "No chunks generated from text"
239:                 raise ValueError(msg)  # noqa: TRY301
240: 
241:             logger.info("Processing %d chunks for TTS", len(chunks))
242: 
243:             # Check memory before parallel processing
244:             mem_usage = check_memory_usage()
245:             if mem_usage > MEMORY_THRESHOLD - 20:  # Leave 20% buffer
246:                 logger.warning(
247:                     "High memory usage (%.1f%%), falling back to serial "
248:                     "processing",
249:                     mem_usage,
250:                 )
251:                 content_audio_bytes = self._text_to_speech_serial(chunks)
252:             else:
253:                 # Determine max workers
254:                 max_workers = min(
255:                     4,  # Reasonable limit to avoid rate limiting
256:                     len(chunks),  # No more workers than chunks
257:                     max(1, psutil.cpu_count() // 2),  # Use half of CPU cores
258:                 )
259: 
260:                 logger.info(
261:                     "Using %d workers for parallel TTS processing",
262:                     max_workers,
263:                 )
264: 
265:                 # Process chunks in parallel
266:                 chunk_results: list[tuple[int, bytes]] = []
267: 
268:                 with concurrent.futures.ThreadPoolExecutor(
269:                     max_workers=max_workers,
270:                 ) as executor:
271:                     # Submit all chunks for processing
272:                     future_to_index = {
273:                         executor.submit(self._process_tts_chunk, chunk, i): i
274:                         for i, chunk in enumerate(chunks)
275:                     }
276: 
277:                     # Collect results as they complete
278:                     for future in concurrent.futures.as_completed(
279:                         future_to_index,
280:                     ):
281:                         index = future_to_index[future]
282:                         try:
283:                             audio_data = future.result()
284:                             chunk_results.append((index, audio_data))
285:                         except Exception:
286:                             logger.exception(
287:                                 "Failed to process chunk %d",
288:                                 index,
289:                             )
290:                             raise
291: 
292:                 # Sort results by index to maintain order
293:                 chunk_results.sort(key=operator.itemgetter(0))
294: 
295:                 # Combine audio chunks
296:                 content_audio_bytes = self._combine_audio_chunks([
297:                     data for _, data in chunk_results
298:                 ])
299: 
300:             # Combine intro, content, and outro with pauses
301:             return ArticleProcessor._combine_intro_content_outro(
302:                 intro_audio,
303:                 content_audio_bytes,
304:                 outro_audio,
305:             )
306: 
307:         except Exception:
308:             logger.exception("TTS generation failed")
309:             raise
310: 
311:     @staticmethod
312:     def _create_intro_text(
313:         title: str,
314:         author: str | None,
315:         pub_date: str | None,
316:     ) -> str:
317:         """Create intro text with available metadata."""
318:         parts = [f"Title: {title}"]
319: 
320:         if author:
321:             parts.append(f"Author: {author}")
322: 
323:         if pub_date:
324:             parts.append(f"Published: {pub_date}")
325: 
326:         return ". ".join(parts) + "."
327: 
328:     @staticmethod
329:     def _create_outro_text(title: str, author: str | None) -> str:
330:         """Create outro text with attribution."""
331:         if author:
332:             return (
333:                 f"This has been an audio version of {title} "
334:                 f"by {author}, created using Podcast It Later."
335:             )
336:         return (
337:             f"This has been an audio version of {title}, "
338:             "created using Podcast It Later."
339:         )
340: 
341:     def _generate_tts_segment(self, text: str) -> bytes:
342:         """Generate TTS audio for a single segment (intro/outro).
343: 
344:         Args:
345:             text: Text to convert to speech
346: 
347:         Returns:
348:             MP3 audio bytes
349:         """
350:         response = self.openai_client.audio.speech.create(
351:             model=TTS_MODEL,
352:             voice=TTS_VOICE,
353:             input=text,
354:         )
355:         return response.content
356: 
357:     @staticmethod
358:     def _combine_intro_content_outro(
359:         intro_audio: bytes,
360:         content_audio: bytes,
361:         outro_audio: bytes,
362:     ) -> bytes:
363:         """Combine intro, content, and outro with crossfades.
364: 
365:         Args:
366:             intro_audio: MP3 bytes for intro
367:             content_audio: MP3 bytes for main content
368:             outro_audio: MP3 bytes for outro
369: 
370:         Returns:
371:             Combined MP3 audio bytes
372:         """
373:         # Load audio segments
374:         intro = AudioSegment.from_mp3(io.BytesIO(intro_audio))
375:         content = AudioSegment.from_mp3(io.BytesIO(content_audio))
376:         outro = AudioSegment.from_mp3(io.BytesIO(outro_audio))
377: 
378:         # Create bridge silence (pause + 2 * crossfade to account for overlap)
379:         bridge = AudioSegment.silent(
380:             duration=PAUSE_DURATION + 2 * CROSSFADE_DURATION
381:         )
382: 
383:         def safe_append(
384:             seg1: AudioSegment, seg2: AudioSegment, crossfade: int
385:         ) -> AudioSegment:
386:             if len(seg1) < crossfade or len(seg2) < crossfade:
387:                 logger.warning(
388:                     "Segment too short for crossfade (%dms vs %dms/%dms), using concatenation",
389:                     crossfade,
390:                     len(seg1),
391:                     len(seg2),
392:                 )
393:                 return seg1 + seg2
394:             return seg1.append(seg2, crossfade=crossfade)
395: 
396:         # Combine segments with crossfades
397:         # Intro -> Bridge -> Content -> Bridge -> Outro
398:         # This effectively fades out the previous segment and fades in the next one
399:         combined = safe_append(intro, bridge, CROSSFADE_DURATION)
400:         combined = safe_append(combined, content, CROSSFADE_DURATION)
401:         combined = safe_append(combined, bridge, CROSSFADE_DURATION)
402:         combined = safe_append(combined, outro, CROSSFADE_DURATION)
403: 
404:         # Export to bytes
405:         output = io.BytesIO()
406:         combined.export(output, format="mp3")
407:         return output.getvalue()
408: 
409:     def _process_tts_chunk(self, chunk: str, index: int) -> bytes:
410:         """Process a single TTS chunk.
411: 
412:         Args:
413:             chunk: Text to convert to speech
414:             index: Chunk index for logging
415: 
416:         Returns:
417:             Audio data as bytes
418:         """
419:         logger.info(
420:             "Generating TTS for chunk %d (%d chars)",
421:             index + 1,
422:             len(chunk),
423:         )
424: 
425:         response = self.openai_client.audio.speech.create(
426:             model=TTS_MODEL,
427:             voice=TTS_VOICE,
428:             input=chunk,
429:             response_format="mp3",
430:         )
431: 
432:         return response.content
433: 
434:     @staticmethod
435:     def _combine_audio_chunks(audio_chunks: list[bytes]) -> bytes:
436:         """Combine multiple audio chunks with silence gaps.
437: 
438:         Args:
439:             audio_chunks: List of audio data in order
440: 
441:         Returns:
442:             Combined audio data
443:         """
444:         if not audio_chunks:
445:             msg = "No audio chunks to combine"
446:             raise ValueError(msg)
447: 
448:         # Create a temporary file for the combined audio
449:         with tempfile.NamedTemporaryFile(
450:             suffix=".mp3",
451:             delete=False,
452:         ) as temp_file:
453:             temp_path = temp_file.name
454: 
455:         try:
456:             # Start with the first chunk
457:             combined_audio = AudioSegment.from_mp3(io.BytesIO(audio_chunks[0]))
458: 
459:             # Add remaining chunks with silence gaps
460:             for chunk_data in audio_chunks[1:]:
461:                 chunk_audio = AudioSegment.from_mp3(io.BytesIO(chunk_data))
462:                 silence = AudioSegment.silent(duration=300)  # 300ms gap
463:                 combined_audio = combined_audio + silence + chunk_audio
464: 
465:             # Export to file
466:             combined_audio.export(temp_path, format="mp3", bitrate="128k")
467: 
468:             # Read back the combined audio
469:             return Path(temp_path).read_bytes()
470: 
471:         finally:
472:             # Clean up temp file
473:             if Path(temp_path).exists():
474:                 Path(temp_path).unlink()
475: 
476:     def _text_to_speech_serial(self, chunks: list[str]) -> bytes:
477:         """Fallback serial processing for high memory situations.
478: 
479:         This is the original serial implementation.
480:         """
481:         # Create a temporary file for streaming audio concatenation
482:         with tempfile.NamedTemporaryFile(
483:             suffix=".mp3",
484:             delete=False,
485:         ) as temp_file:
486:             temp_path = temp_file.name
487: 
488:         try:
489:             # Process first chunk
490:             logger.info("Generating TTS for chunk 1/%d", len(chunks))
491:             response = self.openai_client.audio.speech.create(
492:                 model=TTS_MODEL,
493:                 voice=TTS_VOICE,
494:                 input=chunks[0],
495:                 response_format="mp3",
496:             )
497: 
498:             # Write first chunk directly to file
499:             Path(temp_path).write_bytes(response.content)
500: 
501:             # Process remaining chunks
502:             for i, chunk in enumerate(chunks[1:], 1):
503:                 logger.info(
504:                     "Generating TTS for chunk %d/%d (%d chars)",
505:                     i + 1,
506:                     len(chunks),
507:                     len(chunk),
508:                 )
509: 
510:                 response = self.openai_client.audio.speech.create(
511:                     model=TTS_MODEL,
512:                     voice=TTS_VOICE,
513:                     input=chunk,
514:                     response_format="mp3",
515:                 )
516: 
517:                 # Append to existing file with silence gap
518:                 # Load only the current segment
519:                 current_segment = AudioSegment.from_mp3(
520:                     io.BytesIO(response.content),
521:                 )
522: 
523:                 # Load existing audio, append, and save back
524:                 existing_audio = AudioSegment.from_mp3(temp_path)
525:                 silence = AudioSegment.silent(duration=300)
526:                 combined = existing_audio + silence + current_segment
527: 
528:                 # Export back to the same file
529:                 combined.export(temp_path, format="mp3", bitrate="128k")
530: 
531:                 # Force garbage collection to free memory
532:                 del existing_audio, current_segment, combined
533: 
534:                 # Small delay between API calls
535:                 if i < len(chunks) - 1:
536:                     time.sleep(0.5)
537: 
538:             # Read final result
539:             audio_data = Path(temp_path).read_bytes()
540: 
541:             logger.info(
542:                 "Generated combined TTS audio: %d bytes",
543:                 len(audio_data),
544:             )
545:             return audio_data
546: 
547:         finally:
548:             # Clean up temp file
549:             temp_file_path = Path(temp_path)
550:             if temp_file_path.exists():
551:                 temp_file_path.unlink()
552: 
553:     def upload_to_s3(self, audio_data: bytes, filename: str) -> str:
554:         """Upload audio file to S3-compatible storage and return public URL.
555: 
556:         Raises:
557:             ValueError: If S3 client is not configured.
558:             ClientError: If S3 upload fails.
559:         """
560:         if not self.s3_client:
561:             msg = "S3 client not configured"
562:             raise ValueError(msg)
563: 
564:         try:
565:             # Upload file using streaming to minimize memory usage
566:             audio_stream = io.BytesIO(audio_data)
567:             self.s3_client.upload_fileobj(
568:                 audio_stream,
569:                 S3_BUCKET,
570:                 filename,
571:                 ExtraArgs={
572:                     "ContentType": "audio/mpeg",
573:                     "ACL": "public-read",
574:                 },
575:             )
576: 
577:             # Construct public URL
578:             audio_url = f"{S3_ENDPOINT}/{S3_BUCKET}/{filename}"
579:             logger.info(
580:                 "Uploaded audio to: %s (%d bytes)",
581:                 audio_url,
582:                 len(audio_data),
583:             )
584:         except ClientError:
585:             logger.exception("S3 upload failed")
586:             raise
587:         else:
588:             return audio_url
589: 
590:     @staticmethod
591:     def estimate_duration(audio_data: bytes) -> int:
592:         """Estimate audio duration in seconds based on file size and bitrate."""
593:         # Rough estimation: MP3 at 128kbps = ~16KB per second
594:         estimated_seconds = len(audio_data) // 16000
595:         return max(1, estimated_seconds)  # Minimum 1 second
596: 
597:     @staticmethod
598:     def generate_filename(job_id: int, title: str) -> str:
599:         """Generate unique filename for audio file."""
600:         timestamp = int(datetime.now(tz=timezone.utc).timestamp())
601:         # Create safe filename from title
602:         safe_title = "".join(
603:             c for c in title if c.isalnum() or c in {" ", "-", "_"}
604:         ).rstrip()
605:         safe_title = safe_title.replace(" ", "_")[:50]  # Limit length
606:         return f"episode_{timestamp}_{job_id}_{safe_title}.mp3"
607: 
608:     def process_job(
609:         self,
610:         job: dict[str, Any],
611:     ) -> None:
612:         """Process a single job through the complete pipeline."""
613:         job_id = job["id"]
614:         url = job["url"]
615: 
616:         # Check memory before starting
617:         mem_usage = check_memory_usage()
618:         if mem_usage > MEMORY_THRESHOLD:
619:             logger.warning(
620:                 "High memory usage (%.1f%%), deferring job %d",
621:                 mem_usage,
622:                 job_id,
623:             )
624:             return
625: 
626:         # Track current job for graceful shutdown
627:         self.shutdown_handler.set_current_job(job_id)
628: 
629:         try:
630:             logger.info("Processing job %d: %s", job_id, url)
631: 
632:             # Update status to processing
633:             Core.Database.update_job_status(
634:                 job_id,
635:                 "processing",
636:             )
637: 
638:             # Check for shutdown before each major step
639:             if self.shutdown_handler.is_shutdown_requested():
640:                 logger.info("Shutdown requested, aborting job %d", job_id)
641:                 Core.Database.update_job_status(job_id, "pending")
642:                 return
643: 
644:             # Step 1: Extract article content
645:             Core.Database.update_job_status(job_id, "extracting")
646:             title, content, author, pub_date = (
647:                 ArticleProcessor.extract_article_content(url)
648:             )
649: 
650:             if self.shutdown_handler.is_shutdown_requested():
651:                 logger.info("Shutdown requested, aborting job %d", job_id)
652:                 Core.Database.update_job_status(job_id, "pending")
653:                 return
654: 
655:             # Step 2: Generate audio with metadata
656:             Core.Database.update_job_status(job_id, "synthesizing")
657:             audio_data = self.text_to_speech(content, title, author, pub_date)
658: 
659:             if self.shutdown_handler.is_shutdown_requested():
660:                 logger.info("Shutdown requested, aborting job %d", job_id)
661:                 Core.Database.update_job_status(job_id, "pending")
662:                 return
663: 
664:             # Step 3: Upload to S3
665:             Core.Database.update_job_status(job_id, "uploading")
666:             filename = ArticleProcessor.generate_filename(job_id, title)
667:             audio_url = self.upload_to_s3(audio_data, filename)
668: 
669:             # Step 4: Calculate duration
670:             duration = ArticleProcessor.estimate_duration(audio_data)
671: 
672:             # Step 5: Create episode record
673:             url_hash = Core.hash_url(url)
674:             episode_id = Core.Database.create_episode(
675:                 title=title,
676:                 audio_url=audio_url,
677:                 duration=duration,
678:                 content_length=len(content),
679:                 user_id=job.get("user_id"),
680:                 author=job.get("author"),  # Pass author from job
681:                 original_url=url,  # Pass the original article URL
682:                 original_url_hash=url_hash,
683:             )
684: 
685:             # Add episode to user's feed
686:             user_id = job.get("user_id")
687:             if user_id:
688:                 Core.Database.add_episode_to_user(user_id, episode_id)
689:                 Core.Database.track_episode_event(
690:                     episode_id,
691:                     "added",
692:                     user_id,
693:                 )
694: 
695:             # Step 6: Mark job as complete
696:             Core.Database.update_job_status(
697:                 job_id,
698:                 "completed",
699:             )
700: 
701:             logger.info(
702:                 "Successfully processed job %d -> episode %d",
703:                 job_id,
704:                 episode_id,
705:             )
706: 
707:         except Exception as e:
708:             error_msg = str(e)
709:             logger.exception("Job %d failed: %s", job_id, error_msg)
710:             Core.Database.update_job_status(
711:                 job_id,
712:                 "error",
713:                 error_msg,
714:             )
715:             raise
716:         finally:
717:             # Clear current job
718:             self.shutdown_handler.set_current_job(None)
719: 
720: 
💭Thought[engineer]3 months ago

Now let me read the test classes for ArticleProcessor:

ReadBiz/PodcastItLater/Worker.py @1193-1707
515 lines
1193: class TestTextToSpeech(Test.TestCase):
1194:     """Test text-to-speech functionality."""
1195: 
1196:     def setUp(self) -> None:
1197:         """Set up mocks."""
1198:         # Mock OpenAI API key
1199:         self.env_patcher = unittest.mock.patch.dict(
1200:             os.environ,
1201:             {"OPENAI_API_KEY": "test-key"},
1202:         )
1203:         self.env_patcher.start()
1204: 
1205:         # Mock OpenAI response
1206:         self.mock_audio_response: unittest.mock.MagicMock = (
1207:             unittest.mock.MagicMock()
1208:         )
1209:         self.mock_audio_response.content = b"fake-audio-data"
1210: 
1211:         # Mock AudioSegment to avoid ffmpeg issues in tests
1212:         self.mock_audio_segment: unittest.mock.MagicMock = (
1213:             unittest.mock.MagicMock()
1214:         )
1215:         self.mock_audio_segment.export.return_value = None
1216:         self.audio_segment_patcher = unittest.mock.patch(
1217:             "pydub.AudioSegment.from_mp3",
1218:             return_value=self.mock_audio_segment,
1219:         )
1220:         self.audio_segment_patcher.start()
1221: 
1222:         # Mock the concatenation operations
1223:         self.mock_audio_segment.__add__.return_value = self.mock_audio_segment
1224: 
1225:     def tearDown(self) -> None:
1226:         """Clean up mocks."""
1227:         self.env_patcher.stop()
1228:         self.audio_segment_patcher.stop()
1229: 
1230:     def test_tts_generation(self) -> None:
1231:         """Generate audio from text."""
1232: 
1233:         # Mock the export to write test audio data
1234:         def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
1235:             buffer.write(b"test-audio-output")
1236:             buffer.seek(0)
1237: 
1238:         self.mock_audio_segment.export.side_effect = mock_export
1239: 
1240:         # Mock OpenAI client
1241:         mock_client = unittest.mock.MagicMock()
1242:         mock_audio = unittest.mock.MagicMock()
1243:         mock_speech = unittest.mock.MagicMock()
1244:         mock_speech.create.return_value = self.mock_audio_response
1245:         mock_audio.speech = mock_speech
1246:         mock_client.audio = mock_audio
1247: 
1248:         with (
1249:             unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1250:             unittest.mock.patch(
1251:                 "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1252:                 return_value=["Test content"],
1253:             ),
1254:         ):
1255:             shutdown_handler = ShutdownHandler()
1256:             processor = ArticleProcessor(shutdown_handler)
1257:             audio_data = processor.text_to_speech(
1258:                 "Test content",
1259:                 "Test Title",
1260:             )
1261: 
1262:         self.assertIsInstance(audio_data, bytes)
1263:         self.assertEqual(audio_data, b"test-audio-output")
1264: 
1265:     def test_tts_chunking(self) -> None:
1266:         """Handle long articles with chunking."""
1267:         long_text = "Long content " * 1000
1268:         chunks = ["Chunk 1", "Chunk 2", "Chunk 3"]
1269: 
1270:         def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
1271:             buffer.write(b"test-audio-output")
1272:             buffer.seek(0)
1273: 
1274:         self.mock_audio_segment.export.side_effect = mock_export
1275: 
1276:         # Mock AudioSegment.silent
1277:         # Mock OpenAI client
1278:         mock_client = unittest.mock.MagicMock()
1279:         mock_audio = unittest.mock.MagicMock()
1280:         mock_speech = unittest.mock.MagicMock()
1281:         mock_speech.create.return_value = self.mock_audio_response
1282:         mock_audio.speech = mock_speech
1283:         mock_client.audio = mock_audio
1284: 
1285:         with (
1286:             unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1287:             unittest.mock.patch(
1288:                 "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1289:                 return_value=chunks,
1290:             ),
1291:             unittest.mock.patch(
1292:                 "pydub.AudioSegment.silent",
1293:                 return_value=self.mock_audio_segment,
1294:             ),
1295:         ):
1296:             shutdown_handler = ShutdownHandler()
1297:             processor = ArticleProcessor(shutdown_handler)
1298:             audio_data = processor.text_to_speech(
1299:                 long_text,
1300:                 "Long Article",
1301:             )
1302: 
1303:         # Should have called TTS for each chunk
1304:         self.assertIsInstance(audio_data, bytes)
1305:         self.assertEqual(audio_data, b"test-audio-output")
1306: 
1307:     def test_tts_empty_text(self) -> None:
1308:         """Handle empty input."""
1309:         with unittest.mock.patch(
1310:             "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1311:             return_value=[],
1312:         ):
1313:             shutdown_handler = ShutdownHandler()
1314:             processor = ArticleProcessor(shutdown_handler)
1315:             with pytest.raises(ValueError, match="No chunks generated") as cm:
1316:                 processor.text_to_speech("", "Empty")
1317: 
1318:         self.assertIn("No chunks generated", str(cm.value))
1319: 
1320:     def test_tts_special_characters(self) -> None:
1321:         """Handle unicode and special chars."""
1322:         special_text = 'Unicode: 你好世界 Émojis: 🎙️📰 Special: <>&"'
1323: 
1324:         def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
1325:             buffer.write(b"test-audio-output")
1326:             buffer.seek(0)
1327: 
1328:         self.mock_audio_segment.export.side_effect = mock_export
1329: 
1330:         # Mock OpenAI client
1331:         mock_client = unittest.mock.MagicMock()
1332:         mock_audio = unittest.mock.MagicMock()
1333:         mock_speech = unittest.mock.MagicMock()
1334:         mock_speech.create.return_value = self.mock_audio_response
1335:         mock_audio.speech = mock_speech
1336:         mock_client.audio = mock_audio
1337: 
1338:         with (
1339:             unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1340:             unittest.mock.patch(
1341:                 "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1342:                 return_value=[special_text],
1343:             ),
1344:         ):
1345:             shutdown_handler = ShutdownHandler()
1346:             processor = ArticleProcessor(shutdown_handler)
1347:             audio_data = processor.text_to_speech(
1348:                 special_text,
1349:                 "Special",
1350:             )
1351: 
1352:         self.assertIsInstance(audio_data, bytes)
1353:         self.assertEqual(audio_data, b"test-audio-output")
1354: 
1355:     def test_llm_text_preparation(self) -> None:
1356:         """Verify LLM editing."""
1357:         # Test the actual text preparation functions
1358:         chunks = split_text_into_chunks("Short text", max_chars=100)
1359:         self.assertEqual(len(chunks), 1)
1360:         self.assertEqual(chunks[0], "Short text")
1361: 
1362:         # Test long text splitting
1363:         long_text = "Sentence one. " * 100
1364:         chunks = split_text_into_chunks(long_text, max_chars=100)
1365:         self.assertGreater(len(chunks), 1)
1366:         for chunk in chunks:
1367:             self.assertLessEqual(len(chunk), 100)
1368: 
1369:     @staticmethod
1370:     def test_llm_failure_fallback() -> None:
1371:         """Handle LLM API failures."""
1372:         # Mock LLM failure
1373:         with unittest.mock.patch("openai.OpenAI") as mock_openai:
1374:             mock_client = mock_openai.return_value
1375:             mock_client.chat.completions.create.side_effect = Exception(
1376:                 "API Error",
1377:             )
1378: 
1379:             # Should fall back to raw text
1380:             with pytest.raises(Exception, match="API Error"):
1381:                 edit_chunk_for_speech("Test chunk", "Title", is_first=True)
1382: 
1383:     def test_chunk_concatenation(self) -> None:
1384:         """Verify audio joining."""
1385:         # Mock multiple audio segments
1386:         chunks = ["Chunk 1", "Chunk 2"]
1387: 
1388:         def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
1389:             buffer.write(b"test-audio-output")
1390:             buffer.seek(0)
1391: 
1392:         self.mock_audio_segment.export.side_effect = mock_export
1393: 
1394:         # Mock OpenAI client
1395:         mock_client = unittest.mock.MagicMock()
1396:         mock_audio = unittest.mock.MagicMock()
1397:         mock_speech = unittest.mock.MagicMock()
1398:         mock_speech.create.return_value = self.mock_audio_response
1399:         mock_audio.speech = mock_speech
1400:         mock_client.audio = mock_audio
1401: 
1402:         with (
1403:             unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1404:             unittest.mock.patch(
1405:                 "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1406:                 return_value=chunks,
1407:             ),
1408:             unittest.mock.patch(
1409:                 "pydub.AudioSegment.silent",
1410:                 return_value=self.mock_audio_segment,
1411:             ),
1412:         ):
1413:             shutdown_handler = ShutdownHandler()
1414:             processor = ArticleProcessor(shutdown_handler)
1415:             audio_data = processor.text_to_speech("Test", "Title")
1416: 
1417:         # Should produce combined audio
1418:         self.assertIsInstance(audio_data, bytes)
1419:         self.assertEqual(audio_data, b"test-audio-output")
1420: 
1421:     def test_parallel_tts_generation(self) -> None:
1422:         """Test parallel TTS processing."""
1423:         chunks = ["Chunk 1", "Chunk 2", "Chunk 3", "Chunk 4"]
1424: 
1425:         # Mock responses for each chunk
1426:         mock_responses = []
1427:         for i in range(len(chunks)):
1428:             mock_resp = unittest.mock.MagicMock()
1429:             mock_resp.content = f"audio-{i}".encode()
1430:             mock_responses.append(mock_resp)
1431: 
1432:         # Mock OpenAI client
1433:         mock_client = unittest.mock.MagicMock()
1434:         mock_audio = unittest.mock.MagicMock()
1435:         mock_speech = unittest.mock.MagicMock()
1436: 
1437:         # Make create return different responses for each call
1438:         mock_speech.create.side_effect = mock_responses
1439:         mock_audio.speech = mock_speech
1440:         mock_client.audio = mock_audio
1441: 
1442:         # Mock AudioSegment operations
1443:         mock_segment = unittest.mock.MagicMock()
1444:         mock_segment.__add__.return_value = mock_segment
1445: 
1446:         def mock_export(path: str, **_kwargs: typing.Any) -> None:
1447:             Path(path).write_bytes(b"combined-audio")
1448: 
1449:         mock_segment.export = mock_export
1450: 
1451:         with (
1452:             unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1453:             unittest.mock.patch(
1454:                 "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1455:                 return_value=chunks,
1456:             ),
1457:             unittest.mock.patch(
1458:                 "pydub.AudioSegment.from_mp3",
1459:                 return_value=mock_segment,
1460:             ),
1461:             unittest.mock.patch(
1462:                 "pydub.AudioSegment.silent",
1463:                 return_value=mock_segment,
1464:             ),
1465:             unittest.mock.patch(
1466:                 "Biz.PodcastItLater.Worker.check_memory_usage",
1467:                 return_value=50.0,  # Normal memory usage
1468:             ),
1469:         ):
1470:             shutdown_handler = ShutdownHandler()
1471:             processor = ArticleProcessor(shutdown_handler)
1472:             audio_data = processor.text_to_speech("Test content", "Test Title")
1473: 
1474:         # Verify all chunks were processed
1475:         self.assertEqual(mock_speech.create.call_count, len(chunks))
1476:         self.assertEqual(audio_data, b"combined-audio")
1477: 
1478:     def test_parallel_tts_high_memory_fallback(self) -> None:
1479:         """Test fallback to serial processing when memory is high."""
1480:         chunks = ["Chunk 1", "Chunk 2"]
1481: 
1482:         def mock_export(buffer: io.BytesIO, **_kwargs: typing.Any) -> None:
1483:             buffer.write(b"serial-audio")
1484:             buffer.seek(0)
1485: 
1486:         self.mock_audio_segment.export.side_effect = mock_export
1487: 
1488:         # Mock OpenAI client
1489:         mock_client = unittest.mock.MagicMock()
1490:         mock_audio = unittest.mock.MagicMock()
1491:         mock_speech = unittest.mock.MagicMock()
1492:         mock_speech.create.return_value = self.mock_audio_response
1493:         mock_audio.speech = mock_speech
1494:         mock_client.audio = mock_audio
1495: 
1496:         with (
1497:             unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1498:             unittest.mock.patch(
1499:                 "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1500:                 return_value=chunks,
1501:             ),
1502:             unittest.mock.patch(
1503:                 "Biz.PodcastItLater.Worker.check_memory_usage",
1504:                 return_value=65.0,  # High memory usage
1505:             ),
1506:             unittest.mock.patch(
1507:                 "pydub.AudioSegment.silent",
1508:                 return_value=self.mock_audio_segment,
1509:             ),
1510:         ):
1511:             shutdown_handler = ShutdownHandler()
1512:             processor = ArticleProcessor(shutdown_handler)
1513:             audio_data = processor.text_to_speech("Test content", "Test Title")
1514: 
1515:         # Should use serial processing
1516:         self.assertEqual(audio_data, b"serial-audio")
1517: 
1518:     @staticmethod
1519:     def test_parallel_tts_error_handling() -> None:
1520:         """Test error handling in parallel TTS processing."""
1521:         chunks = ["Chunk 1", "Chunk 2"]
1522: 
1523:         # Mock OpenAI client with one failure
1524:         mock_client = unittest.mock.MagicMock()
1525:         mock_audio = unittest.mock.MagicMock()
1526:         mock_speech = unittest.mock.MagicMock()
1527: 
1528:         # First call succeeds, second fails
1529:         mock_resp1 = unittest.mock.MagicMock()
1530:         mock_resp1.content = b"audio-1"
1531:         mock_speech.create.side_effect = [mock_resp1, Exception("API Error")]
1532: 
1533:         mock_audio.speech = mock_speech
1534:         mock_client.audio = mock_audio
1535: 
1536:         # Set up the test context
1537:         shutdown_handler = ShutdownHandler()
1538:         processor = ArticleProcessor(shutdown_handler)
1539: 
1540:         with (
1541:             unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1542:             unittest.mock.patch(
1543:                 "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1544:                 return_value=chunks,
1545:             ),
1546:             unittest.mock.patch(
1547:                 "Biz.PodcastItLater.Worker.check_memory_usage",
1548:                 return_value=50.0,
1549:             ),
1550:             pytest.raises(Exception, match="API Error"),
1551:         ):
1552:             processor.text_to_speech("Test content", "Test Title")
1553: 
1554:     def test_parallel_tts_order_preservation(self) -> None:
1555:         """Test that chunks are combined in the correct order."""
1556:         chunks = ["First", "Second", "Third", "Fourth", "Fifth"]
1557: 
1558:         # Create mock responses with identifiable content
1559:         mock_responses = []
1560:         for chunk in chunks:
1561:             mock_resp = unittest.mock.MagicMock()
1562:             mock_resp.content = f"audio-{chunk}".encode()
1563:             mock_responses.append(mock_resp)
1564: 
1565:         # Mock OpenAI client
1566:         mock_client = unittest.mock.MagicMock()
1567:         mock_audio = unittest.mock.MagicMock()
1568:         mock_speech = unittest.mock.MagicMock()
1569:         mock_speech.create.side_effect = mock_responses
1570:         mock_audio.speech = mock_speech
1571:         mock_client.audio = mock_audio
1572: 
1573:         # Track the order of segments being combined
1574:         combined_order = []
1575: 
1576:         def mock_from_mp3(data: io.BytesIO) -> unittest.mock.MagicMock:
1577:             content = data.read()
1578:             combined_order.append(content.decode())
1579:             segment = unittest.mock.MagicMock()
1580:             segment.__add__.return_value = segment
1581:             return segment
1582: 
1583:         mock_segment = unittest.mock.MagicMock()
1584:         mock_segment.__add__.return_value = mock_segment
1585: 
1586:         def mock_export(path: str, **_kwargs: typing.Any) -> None:
1587:             # Verify order is preserved
1588:             expected_order = [f"audio-{chunk}" for chunk in chunks]
1589:             if combined_order != expected_order:
1590:                 msg = f"Order mismatch: {combined_order} != {expected_order}"
1591:                 raise AssertionError(msg)
1592:             Path(path).write_bytes(b"ordered-audio")
1593: 
1594:         mock_segment.export = mock_export
1595: 
1596:         with (
1597:             unittest.mock.patch("openai.OpenAI", return_value=mock_client),
1598:             unittest.mock.patch(
1599:                 "Biz.PodcastItLater.Worker.prepare_text_for_tts",
1600:                 return_value=chunks,
1601:             ),
1602:             unittest.mock.patch(
1603:                 "pydub.AudioSegment.from_mp3",
1604:                 side_effect=mock_from_mp3,
1605:             ),
1606:             unittest.mock.patch(
1607:                 "pydub.AudioSegment.silent",
1608:                 return_value=mock_segment,
1609:             ),
1610:             unittest.mock.patch(
1611:                 "Biz.PodcastItLater.Worker.check_memory_usage",
1612:                 return_value=50.0,
1613:             ),
1614:         ):
1615:             shutdown_handler = ShutdownHandler()
1616:             processor = ArticleProcessor(shutdown_handler)
1617:             audio_data = processor.text_to_speech("Test content", "Test Title")
1618: 
1619:         self.assertEqual(audio_data, b"ordered-audio")
1620: 
1621: 
1622: class TestIntroOutro(Test.TestCase):
1623:     """Test intro and outro generation with metadata."""
1624: 
1625:     def test_create_intro_text_full_metadata(self) -> None:
1626:         """Test intro text creation with all metadata."""
1627:         intro = ArticleProcessor._create_intro_text(  # noqa: SLF001
1628:             title="Test Article",
1629:             author="John Doe",
1630:             pub_date="2024-01-15",
1631:         )
1632:         self.assertIn("Title: Test Article", intro)
1633:         self.assertIn("Author: John Doe", intro)
1634:         self.assertIn("Published: 2024-01-15", intro)
1635: 
1636:     def test_create_intro_text_no_author(self) -> None:
1637:         """Test intro text without author."""
1638:         intro = ArticleProcessor._create_intro_text(  # noqa: SLF001
1639:             title="Test Article",
1640:             author=None,
1641:             pub_date="2024-01-15",
1642:         )
1643:         self.assertIn("Title: Test Article", intro)
1644:         self.assertNotIn("Author:", intro)
1645:         self.assertIn("Published: 2024-01-15", intro)
1646: 
1647:     def test_create_intro_text_minimal(self) -> None:
1648:         """Test intro text with only title."""
1649:         intro = ArticleProcessor._create_intro_text(  # noqa: SLF001
1650:             title="Test Article",
1651:             author=None,
1652:             pub_date=None,
1653:         )
1654:         self.assertEqual(intro, "Title: Test Article.")
1655: 
1656:     def test_create_outro_text_with_author(self) -> None:
1657:         """Test outro text with author."""
1658:         outro = ArticleProcessor._create_outro_text(  # noqa: SLF001
1659:             title="Test Article",
1660:             author="Jane Smith",
1661:         )
1662:         self.assertIn("Test Article", outro)
1663:         self.assertIn("Jane Smith", outro)
1664:         self.assertIn("Podcast It Later", outro)
1665: 
1666:     def test_create_outro_text_no_author(self) -> None:
1667:         """Test outro text without author."""
1668:         outro = ArticleProcessor._create_outro_text(  # noqa: SLF001
1669:             title="Test Article",
1670:             author=None,
1671:         )
1672:         self.assertIn("Test Article", outro)
1673:         self.assertNotIn("by", outro)
1674:         self.assertIn("Podcast It Later", outro)
1675: 
1676:     def test_extract_with_metadata(self) -> None:
1677:         """Test that extraction returns metadata."""
1678:         mock_html = "<html><body><p>Content</p></body></html>"
1679:         mock_result = json.dumps({
1680:             "title": "Test Article",
1681:             "text": "Article content",
1682:             "author": "Test Author",
1683:             "date": "2024-01-15",
1684:         })
1685: 
1686:         with (
1687:             unittest.mock.patch(
1688:                 "trafilatura.fetch_url",
1689:                 return_value=mock_html,
1690:             ),
1691:             unittest.mock.patch(
1692:                 "trafilatura.extract",
1693:                 return_value=mock_result,
1694:             ),
1695:         ):
1696:             title, content, author, pub_date = (
1697:                 ArticleProcessor.extract_article_content(
1698:                     "https://example.com",
1699:                 )
1700:             )
1701: 
1702:         self.assertEqual(title, "Test Article")
1703:         self.assertEqual(content, "Article content")
1704:         self.assertEqual(author, "Test Author")
1705:         self.assertEqual(pub_date, "2024-01-15")
1706: 
1707: 
💭Thought[engineer]3 months ago

Now let me read the TestTextChunking class:

ReadBiz/PodcastItLater/Worker.py @2156-2178
23 lines
2156: class TestTextChunking(Test.TestCase):
2157:     """Test text chunking edge cases."""
2158: 
2159:     def test_split_text_single_long_word(self) -> None:
2160:         """Handle text with a single word exceeding limit."""
2161:         long_word = "a" * 4000
2162:         chunks = split_text_into_chunks(long_word, max_chars=3000)
2163: 
2164:         # Should keep it as one chunk or split?
2165:         # The current implementation does not split words
2166:         self.assertEqual(len(chunks), 1)
2167:         self.assertEqual(len(chunks[0]), 4000)
2168: 
2169:     def test_split_text_no_sentence_boundaries(self) -> None:
2170:         """Handle long text with no sentence boundaries."""
2171:         text = "word " * 1000  # 5000 chars
2172:         chunks = split_text_into_chunks(text, max_chars=3000)
2173: 
2174:         # Should keep it as one chunk as it can't split by ". "
2175:         self.assertEqual(len(chunks), 1)
2176:         self.assertGreater(len(chunks[0]), 3000)
2177: 
2178: 
💭Thought[engineer]3 months ago

Now let me check TestJobProcessing and TestMemoryEfficiency and TestWorkerErrorHandling:

ReadBiz/PodcastItLater/Worker.py @1708-1850
143 lines
1708: class TestMemoryEfficiency(Test.TestCase):
1709:     """Test memory-efficient processing."""
1710: 
1711:     def test_large_article_size_limit(self) -> None:
1712:         """Test that articles exceeding size limits are rejected."""
1713:         huge_text = "x" * (MAX_ARTICLE_SIZE + 1000)  # Exceed limit
1714: 
1715:         with (
1716:             unittest.mock.patch(
1717:                 "trafilatura.fetch_url",
1718:                 return_value=huge_text * 4,  # Simulate large HTML
1719:             ),
1720:             pytest.raises(ValueError, match="Article too large") as cm,
1721:         ):
1722:             ArticleProcessor.extract_article_content("https://example.com")
1723: 
1724:         self.assertIn("Article too large", str(cm.value))
1725: 
1726:     def test_content_truncation(self) -> None:
1727:         """Test that oversized content is truncated."""
1728:         large_content = "Content " * 100_000  # Create large content
1729:         mock_result = json.dumps({
1730:             "title": "Large Article",
1731:             "text": large_content,
1732:         })
1733: 
1734:         with (
1735:             unittest.mock.patch(
1736:                 "trafilatura.fetch_url",
1737:                 return_value="<html><body>content</body></html>",
1738:             ),
1739:             unittest.mock.patch(
1740:                 "trafilatura.extract",
1741:                 return_value=mock_result,
1742:             ),
1743:         ):
1744:             title, content, _author, _pub_date = (
1745:                 ArticleProcessor.extract_article_content(
1746:                     "https://example.com",
1747:                 )
1748:             )
1749: 
1750:         self.assertEqual(title, "Large Article")
1751:         self.assertLessEqual(len(content), MAX_ARTICLE_SIZE)
1752: 
1753:     def test_memory_usage_check(self) -> None:
1754:         """Test memory usage monitoring."""
1755:         usage = check_memory_usage()
1756:         self.assertIsInstance(usage, float)
1757:         self.assertGreaterEqual(usage, 0.0)
1758:         self.assertLessEqual(usage, 100.0)
1759: 
1760: 
1761: class TestJobProcessing(Test.TestCase):
1762:     """Test job processing functionality."""
1763: 
1764:     def setUp(self) -> None:
1765:         """Set up test environment."""
1766:         Core.Database.init_db()
1767: 
1768:         # Create test user and job
1769:         self.user_id, _ = Core.Database.create_user(
1770:             "test@example.com",
1771:         )
1772:         self.job_id = Core.Database.add_to_queue(
1773:             "https://example.com/article",
1774:             "test@example.com",
1775:             self.user_id,
1776:         )
1777: 
1778:         # Mock environment
1779:         self.env_patcher = unittest.mock.patch.dict(
1780:             os.environ,
1781:             {
1782:                 "OPENAI_API_KEY": "test-key",
1783:                 "S3_ENDPOINT": "https://s3.example.com",
1784:                 "S3_BUCKET": "test-bucket",
1785:                 "S3_ACCESS_KEY": "test-access",
1786:                 "S3_SECRET_KEY": "test-secret",
1787:             },
1788:         )
1789:         self.env_patcher.start()
1790: 
1791:     def tearDown(self) -> None:
1792:         """Clean up."""
1793:         self.env_patcher.stop()
1794:         Core.Database.teardown()
1795: 
1796:     def test_process_job_success(self) -> None:
1797:         """Complete pipeline execution."""
1798:         shutdown_handler = ShutdownHandler()
1799:         processor = ArticleProcessor(shutdown_handler)
1800:         job = Core.Database.get_job_by_id(self.job_id)
1801:         if job is None:
1802:             msg = "no job found for %s"
1803:             raise Test.TestError(msg, self.job_id)
1804: 
1805:         # Mock all external calls
1806:         with (
1807:             unittest.mock.patch.object(
1808:                 ArticleProcessor,
1809:                 "extract_article_content",
1810:                 return_value=(
1811:                     "Test Title",
1812:                     "Test content",
1813:                     "Test Author",
1814:                     "2024-01-15",
1815:                 ),
1816:             ),
1817:             unittest.mock.patch.object(
1818:                 ArticleProcessor,
1819:                 "text_to_speech",
1820:                 return_value=b"audio-data",
1821:             ),
1822:             unittest.mock.patch.object(
1823:                 processor,
1824:                 "upload_to_s3",
1825:                 return_value="https://s3.example.com/audio.mp3",
1826:             ),
1827:             unittest.mock.patch(
1828:                 "Biz.PodcastItLater.Core.Database.update_job_status",
1829:             ) as mock_update,
1830:             unittest.mock.patch(
1831:                 "Biz.PodcastItLater.Core.Database.create_episode",
1832:             ) as mock_create,
1833:         ):
1834:             mock_create.return_value = 1
1835:             processor.process_job(job)
1836: 
1837:         # Verify job was marked complete
1838:         mock_update.assert_called_with(self.job_id, "completed")
1839:         mock_create.assert_called_once()
1840: 
1841:     def test_process_job_extraction_failure(self) -> None:
1842:         """Handle bad URLs."""
1843:         shutdown_handler = ShutdownHandler()
1844:         processor = ArticleProcessor(shutdown_handler)
1845:         job = Core.Database.get_job_by_id(self.job_id)
1846:         if job is None:
1847:             msg = "no job found for %s"
1848:             raise Test.TestError(msg, self.job_id)
1849: 
1850:         with (
💭Thought[engineer]3 months ago

Let me continue reading:

ReadBiz/PodcastItLater/Worker.py @1850-2067
218 lines
1850:         with (
1851:             unittest.mock.patch.object(
1852:                 ArticleProcessor,
1853:                 "extract_article_content",
1854:                 side_effect=ValueError("Bad URL"),
1855:             ),
1856:             unittest.mock.patch(
1857:                 "Biz.PodcastItLater.Core.Database.update_job_status",
1858:             ) as mock_update,
1859:             pytest.raises(ValueError, match="Bad URL"),
1860:         ):
1861:             processor.process_job(job)
1862: 
1863:         # Job should be marked as error
1864:         mock_update.assert_called_with(self.job_id, "error", "Bad URL")
1865: 
1866:     def test_process_job_tts_failure(self) -> None:
1867:         """Handle TTS errors."""
1868:         shutdown_handler = ShutdownHandler()
1869:         processor = ArticleProcessor(shutdown_handler)
1870:         job = Core.Database.get_job_by_id(self.job_id)
1871:         if job is None:
1872:             msg = "no job found for %s"
1873:             raise Test.TestError(msg, self.job_id)
1874: 
1875:         with (
1876:             unittest.mock.patch.object(
1877:                 ArticleProcessor,
1878:                 "extract_article_content",
1879:                 return_value=("Title", "Content"),
1880:             ),
1881:             unittest.mock.patch.object(
1882:                 ArticleProcessor,
1883:                 "text_to_speech",
1884:                 side_effect=Exception("TTS Error"),
1885:             ),
1886:             unittest.mock.patch(
1887:                 "Biz.PodcastItLater.Core.Database.update_job_status",
1888:             ) as mock_update,
1889:             pytest.raises(Exception, match="TTS Error"),
1890:         ):
1891:             processor.process_job(job)
1892: 
1893:         mock_update.assert_called_with(self.job_id, "error", "TTS Error")
1894: 
1895:     def test_process_job_s3_failure(self) -> None:
1896:         """Handle upload errors."""
1897:         shutdown_handler = ShutdownHandler()
1898:         processor = ArticleProcessor(shutdown_handler)
1899:         job = Core.Database.get_job_by_id(self.job_id)
1900:         if job is None:
1901:             msg = "no job found for %s"
1902:             raise Test.TestError(msg, self.job_id)
1903: 
1904:         with (
1905:             unittest.mock.patch.object(
1906:                 ArticleProcessor,
1907:                 "extract_article_content",
1908:                 return_value=("Title", "Content"),
1909:             ),
1910:             unittest.mock.patch.object(
1911:                 ArticleProcessor,
1912:                 "text_to_speech",
1913:                 return_value=b"audio",
1914:             ),
1915:             unittest.mock.patch.object(
1916:                 processor,
1917:                 "upload_to_s3",
1918:                 side_effect=ClientError({}, "PutObject"),
1919:             ),
1920:             unittest.mock.patch(
1921:                 "Biz.PodcastItLater.Core.Database.update_job_status",
1922:             ),
1923:             pytest.raises(ClientError),
1924:         ):
1925:             processor.process_job(job)
1926: 
1927:     def test_job_retry_logic(self) -> None:
1928:         """Verify exponential backoff."""
1929:         # Set job to error with retry count
1930:         Core.Database.update_job_status(
1931:             self.job_id,
1932:             "error",
1933:             "First failure",
1934:         )
1935:         Core.Database.update_job_status(
1936:             self.job_id,
1937:             "error",
1938:             "Second failure",
1939:         )
1940: 
1941:         job = Core.Database.get_job_by_id(self.job_id)
1942:         if job is None:
1943:             msg = "no job found for %s"
1944:             raise Test.TestError(msg, self.job_id)
1945: 
1946:         self.assertEqual(job["retry_count"], 2)
1947: 
1948:         # Should be retryable
1949:         retryable = Core.Database.get_retryable_jobs(
1950:             max_retries=3,
1951:         )
1952:         self.assertEqual(len(retryable), 1)
1953: 
1954:     def test_max_retries(self) -> None:
1955:         """Stop after max attempts."""
1956:         # Exceed retry limit
1957:         for i in range(4):
1958:             Core.Database.update_job_status(
1959:                 self.job_id,
1960:                 "error",
1961:                 f"Failure {i}",
1962:             )
1963: 
1964:         # Should not be retryable
1965:         retryable = Core.Database.get_retryable_jobs(
1966:             max_retries=3,
1967:         )
1968:         self.assertEqual(len(retryable), 0)
1969: 
1970:     def test_graceful_shutdown(self) -> None:
1971:         """Test graceful shutdown during job processing."""
1972:         shutdown_handler = ShutdownHandler()
1973:         processor = ArticleProcessor(shutdown_handler)
1974:         job = Core.Database.get_job_by_id(self.job_id)
1975:         if job is None:
1976:             msg = "no job found for %s"
1977:             raise Test.TestError(msg, self.job_id)
1978: 
1979:         def mock_tts(*_args: Any) -> bytes:
1980:             shutdown_handler.shutdown_requested.set()
1981:             return b"audio-data"
1982: 
1983:         with (
1984:             unittest.mock.patch.object(
1985:                 ArticleProcessor,
1986:                 "extract_article_content",
1987:                 return_value=(
1988:                     "Test Title",
1989:                     "Test content",
1990:                     "Test Author",
1991:                     "2024-01-15",
1992:                 ),
1993:             ),
1994:             unittest.mock.patch.object(
1995:                 ArticleProcessor,
1996:                 "text_to_speech",
1997:                 side_effect=mock_tts,
1998:             ),
1999:             unittest.mock.patch(
2000:                 "Biz.PodcastItLater.Core.Database.update_job_status",
2001:             ) as mock_update,
2002:         ):
2003:             processor.process_job(job)
2004: 
2005:         # Job should be reset to pending due to shutdown
2006:         mock_update.assert_any_call(self.job_id, "pending")
2007: 
2008:     def test_cleanup_stale_jobs(self) -> None:
2009:         """Test cleanup of stale processing jobs."""
2010:         # Manually set job to processing
2011:         Core.Database.update_job_status(self.job_id, "processing")
2012: 
2013:         # Run cleanup
2014:         cleanup_stale_jobs()
2015: 
2016:         # Job should be back to pending
2017:         job = Core.Database.get_job_by_id(self.job_id)
2018:         if job is None:
2019:             msg = "no job found for %s"
2020:             raise Test.TestError(msg, self.job_id)
2021:         self.assertEqual(job["status"], "pending")
2022: 
2023:     def test_concurrent_processing(self) -> None:
2024:         """Handle multiple jobs."""
2025:         # Create multiple jobs
2026:         job2 = Core.Database.add_to_queue(
2027:             "https://example.com/2",
2028:             "test@example.com",
2029:             self.user_id,
2030:         )
2031:         job3 = Core.Database.add_to_queue(
2032:             "https://example.com/3",
2033:             "test@example.com",
2034:             self.user_id,
2035:         )
2036: 
2037:         # Get pending jobs
2038:         jobs = Core.Database.get_pending_jobs(limit=5)
2039: 
2040:         self.assertEqual(len(jobs), 3)
2041:         self.assertEqual({j["id"] for j in jobs}, {self.job_id, job2, job3})
2042: 
2043:     def test_memory_threshold_deferral(self) -> None:
2044:         """Test that jobs are deferred when memory usage is high."""
2045:         shutdown_handler = ShutdownHandler()
2046:         processor = ArticleProcessor(shutdown_handler)
2047:         job = Core.Database.get_job_by_id(self.job_id)
2048:         if job is None:
2049:             msg = "no job found for %s"
2050:             raise Test.TestError(msg, self.job_id)
2051: 
2052:         # Mock high memory usage
2053:         with (
2054:             unittest.mock.patch(
2055:                 "Biz.PodcastItLater.Worker.check_memory_usage",
2056:                 return_value=90.0,  # High memory usage
2057:             ),
2058:             unittest.mock.patch(
2059:                 "Biz.PodcastItLater.Core.Database.update_job_status",
2060:             ) as mock_update,
2061:         ):
2062:             processor.process_job(job)
2063: 
2064:         # Job should not be processed (no status updates)
2065:         mock_update.assert_not_called()
2066: 
2067: 
ReadBiz/PodcastItLater/Worker.py @2068-2155
88 lines
2068: class TestWorkerErrorHandling(Test.TestCase):
2069:     """Test worker error handling and recovery."""
2070: 
2071:     def setUp(self) -> None:
2072:         """Set up test environment."""
2073:         Core.Database.init_db()
2074:         self.user_id, _ = Core.Database.create_user("test@example.com")
2075:         self.job_id = Core.Database.add_to_queue(
2076:             "https://example.com",
2077:             "test@example.com",
2078:             self.user_id,
2079:         )
2080:         self.shutdown_handler = ShutdownHandler()
2081:         self.processor = ArticleProcessor(self.shutdown_handler)
2082: 
2083:     @staticmethod
2084:     def tearDown() -> None:
2085:         """Clean up."""
2086:         Core.Database.teardown()
2087: 
2088:     def test_process_pending_jobs_exception_handling(self) -> None:
2089:         """Test that process_pending_jobs handles exceptions."""
2090: 
2091:         def side_effect(job: dict[str, Any]) -> None:
2092:             # Simulate process_job starting and setting status to processing
2093:             Core.Database.update_job_status(job["id"], "processing")
2094:             msg = "Unexpected Error"
2095:             raise ValueError(msg)
2096: 
2097:         with (
2098:             unittest.mock.patch.object(
2099:                 self.processor,
2100:                 "process_job",
2101:                 side_effect=side_effect,
2102:             ),
2103:             unittest.mock.patch(
2104:                 "Biz.PodcastItLater.Core.Database.update_job_status",
2105:                 side_effect=Core.Database.update_job_status,
2106:             ) as _mock_update,
2107:         ):
2108:             process_pending_jobs(self.processor)
2109: 
2110:         # Job should be marked as error
2111:         job = Core.Database.get_job_by_id(self.job_id)
2112:         self.assertIsNotNone(job)
2113:         if job:
2114:             self.assertEqual(job["status"], "error")
2115:             self.assertIn("Unexpected Error", job["error_message"])
2116: 
2117:     def test_process_retryable_jobs_success(self) -> None:
2118:         """Test processing of retryable jobs."""
2119:         # Set up a retryable job
2120:         Core.Database.update_job_status(self.job_id, "error", "Fail 1")
2121: 
2122:         # Modify created_at to be in the past to satisfy backoff
2123:         with Core.Database.get_connection() as conn:
2124:             conn.execute(
2125:                 "UPDATE queue SET created_at = ? WHERE id = ?",
2126:                 (
2127:                     (
2128:                         datetime.now(tz=timezone.utc) - timedelta(minutes=5)
2129:                     ).isoformat(),
2130:                     self.job_id,
2131:                 ),
2132:             )
2133:             conn.commit()
2134: 
2135:         process_retryable_jobs()
2136: 
2137:         job = Core.Database.get_job_by_id(self.job_id)
2138:         self.assertIsNotNone(job)
2139:         if job:
2140:             self.assertEqual(job["status"], "pending")
2141: 
2142:     def test_process_retryable_jobs_not_ready(self) -> None:
2143:         """Test that jobs are not retried before backoff period."""
2144:         # Set up a retryable job that just failed
2145:         Core.Database.update_job_status(self.job_id, "error", "Fail 1")
2146: 
2147:         # created_at is now, so backoff should prevent retry
2148:         process_retryable_jobs()
2149: 
2150:         job = Core.Database.get_job_by_id(self.job_id)
2151:         self.assertIsNotNone(job)
2152:         if job:
2153:             self.assertEqual(job["status"], "error")
2154: 
2155: 
💭Thought[engineer]3 months ago

Perfect! Now I have a complete understanding. Let me start creating the new module files. I'll follow the pattern where tests are kept near the code they test.

Let's start with Worker/Processor.py:

WriteBiz/PodcastItLater/Worker/Processor.py
💭Thought[engineer]3 months ago

Now let's create the TextProcessing module:

WriteBiz/PodcastItLater/Worker/TextProcessing.py
💭Thought[engineer]3 months ago

Now let's create the Jobs module. Let me first read more of the test code to understand better:

WriteBiz/PodcastItLater/Worker/Jobs.py
💭Thought[engineer]3 months ago

Now I need to update the main Worker.py file to remove the moved code and import from the new modules:

ReadBiz/PodcastItLater/Worker.py @1-66
66 lines
1: """Background worker for processing article-to-podcast conversions."""
2: 
3: # : dep boto3
4: # : dep botocore
5: # : dep openai
6: # : dep psutil
7: # : dep pydub
8: # : dep pytest
9: # : dep pytest-asyncio
10: # : dep pytest-mock
11: # : dep trafilatura
12: # : out podcastitlater-worker
13: # : run ffmpeg
14: import Biz.PodcastItLater.Core as Core
15: import boto3  # type: ignore[import-untyped]
16: import concurrent.futures
17: import io
18: import json
19: import logging
20: import Omni.App as App
21: import Omni.Log as Log
22: import Omni.Test as Test
23: import openai
24: import operator
25: import os
26: import psutil  # type: ignore[import-untyped]
27: import pytest
28: import signal
29: import sys
30: import tempfile
31: import threading
32: import time
33: import trafilatura
34: import typing
35: import unittest.mock
36: from botocore.exceptions import ClientError  # type: ignore[import-untyped]
37: from datetime import datetime
38: from datetime import timedelta
39: from datetime import timezone
40: from pathlib import Path
41: from pydub import AudioSegment  # type: ignore[import-untyped]
42: from typing import Any
43: 
44: logger = logging.getLogger(__name__)
45: Log.setup(logger)
46: 
47: # Configuration from environment variables
48: OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
49: S3_ENDPOINT = os.getenv("S3_ENDPOINT")
50: S3_BUCKET = os.getenv("S3_BUCKET")
51: S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
52: S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
53: area = App.from_env()
54: 
55: # Worker configuration
56: MAX_CONTENT_LENGTH = 5000  # characters for TTS
57: MAX_ARTICLE_SIZE = 500_000  # 500KB character limit for articles
58: POLL_INTERVAL = 30  # seconds
59: MAX_RETRIES = 3
60: TTS_MODEL = "tts-1"
61: TTS_VOICE = "alloy"
62: MEMORY_THRESHOLD = 80  # Percentage threshold for memory usage
63: CROSSFADE_DURATION = 500  # ms for crossfading segments
64: PAUSE_DURATION = 1000  # ms for silence between segments
65: 
66: 
💭Thought[engineer]3 months ago

Now I'll create the new Worker.py file with imports from the new modules and the remaining code:

WriteBiz/PodcastItLater/Worker.py
💭Thought[engineer]3 months ago

There's a small issue in the Worker.py - I used timezone.timedelta instead of just timedelta. Let me fix that:

EditBiz/PodcastItLater/Worker.py
💭Thought[engineer]3 months ago

Now let me run the tests to see if everything works:

ϟcd /tmp/tmp.YODuNmWBNL && bild --test Biz/PodcastItLater/Worker/py.hs 2>&1 | head -100
/bin/sh: line 1: cd: /tmp/tmp.YODuNmWBNL: No such file or directory
ϟbild --test Biz/PodcastItLater/Worker/py.hs 2>&1 | head -150
[0m[38;5;1m[2Kfail:  bild:  nothing to build
[0m[0m
[0m
💭Thought[engineer]3 months ago

The .hs file doesn't exist. Let me check the namespace structure:

ϟls -la Biz/PodcastItLater/Worker* 2>&1
-rw-r--r-- 1 ben users 1910 Sep  5 14:25 Biz/PodcastItLater/Worker.nix
-rw-r--r-- 1 ben users 8432 Dec  2 14:23 Biz/PodcastItLater/Worker.py

Biz/PodcastItLater/Worker:
total 80
drwxr-xr-x 2 ben users  4096 Dec  2 14:22 .
drwxr-xr-x 4 ben users  4096 Dec  2 14:16 ..
-rw-r--r-- 1 ben users 16106 Dec  2 14:22 Jobs.py
-rw-r--r-- 1 ben users 48166 Dec  2 14:20 Processor.py
-rw-r--r-- 1 ben users  6601 Dec  2 14:20 TextProcessing.py
💭Thought[engineer]3 months ago

Good! The files are created. Let me test the Worker.py file directly:

ϟbild --test Biz/PodcastItLater/Worker.py 2>&1 | head -200
29 lines
7[10000;10000H
[1A[1G[2K[.] Biz/PodcastItLater/Worker.py
[1A[1G[2K[0m[+] Biz/PodcastItLater/Worker.py[0m[1B[1A[1G[2K[0m[~] Biz/PodcastItLater/Worker.py[0m[1B[1A[1G[2K[~] Biz/PodcastItLater/Worker.py: error:       … while evaluating the attrib...[1Berror:
       … while evaluating the attribute 'drvPath'
         at /nix/store/b462w1d6vfabn5dqzkxr4fpsd467x2gx-nixos-unstable-src/lib/customisation.nix:429:7:
          428|     // {
          429|       drvPath =
             |       ^
          430|         assert condition;

       … while calling the 'derivationStrict' builtin
         at <nix/derivation-internal.nix>:37:12:
           36|
           37|   strict = derivationStrict drvAttrs;
             |            ^
           38|

       (stack trace truncated; use '--show-trace' to show the full, detailed trace)

       error: attribute 'setuptools' missing
       at /home/ben/omni/Omni/Bild/Builder.nix:284:25:
          283|         pyproject = true;
          284|         build-system = [python.packages.setuptools];
             |                         ^
          285|         nativeBuildInputs = [makeWrapper];
[0m[38;5;1m[2Kfail:  bild:  instantiate:  Biz/PodcastItLater/Worker.py
[0m[0m
[0m[0m
[0m[0m[38;5;1m[2Kfail:  bild:  timeout after 10 minutes
[0m[0m
[0m
⚠️Guardrail[system]3 months ago
{"contents":[402.5556000000001,400],"tag":"GuardrailCostExceeded"}
Error[engineer]3 months ago
{"reason":"guardrail_violation"}
💬[engineer]3 months ago

Guardrail: cost budget exceeded (402.5556000000001/400.0 cents)

🔄[engineer]InProgress → NeedsHelp3 months ago
🔄[human]NeedsHelp → Done3 months ago