diff --git a/.env.example b/.env.example index 1e10ade..87c261b 100644 --- a/.env.example +++ b/.env.example @@ -21,23 +21,37 @@ BASE_URL=http://localhost:8000 ALLOW_HTTP_SESSIONS=true # Slack — one pair per agent (Bot User OAuth Token + App-Level Token) +# Add as many agents as needed using this pattern; no code changes required. +# SLACK_BOT_TOKEN_=xoxb-... (required) +# SLACK_APP_TOKEN_=xapp-... (optional) SLACK_BOT_TOKEN_SU=xoxb-placeholder -SLACK_APP_TOKEN_SU=xapp-placeholder SLACK_BOT_TOKEN_WISEMAN=xoxb-placeholder -SLACK_APP_TOKEN_WISEMAN=xapp-placeholder -SLACK_BOT_TOKEN_LOTZ=xoxb-placeholder -SLACK_APP_TOKEN_LOTZ=xapp-placeholder -SLACK_BOT_TOKEN_CRAVATT=xoxb-placeholder -SLACK_APP_TOKEN_CRAVATT=xapp-placeholder -SLACK_BOT_TOKEN_GROTJAHN=xoxb-placeholder -SLACK_APP_TOKEN_GROTJAHN=xapp-placeholder -SLACK_BOT_TOKEN_PETRASCHECK=xoxb-placeholder -SLACK_APP_TOKEN_PETRASCHECK=xapp-placeholder -SLACK_BOT_TOKEN_KEN=xoxb-placeholder -SLACK_APP_TOKEN_KEN=xapp-placeholder -SLACK_BOT_TOKEN_RACKI=xoxb-placeholder -SLACK_APP_TOKEN_RACKI=xapp-placeholder -SLACK_BOT_TOKEN_SAEZ=xoxb-placeholder -SLACK_APP_TOKEN_SAEZ=xapp-placeholder -SLACK_BOT_TOKEN_WU=xoxb-placeholder -SLACK_APP_TOKEN_WU=xapp-placeholder +SLACK_BOT_TOKEN_GRANTBOT=xoxb-placeholder + +# Podcast TTS backend: "mistral" (default), "openai", or "local" (vLLM-Omni server) +PODCAST_TTS_BACKEND="mistral" + +# Mistral AI TTS (used when PODCAST_TTS_BACKEND=mistral) +MISTRAL_API_KEY=your-mistral-api-key +MISTRAL_TTS_MODEL=voxtral-mini-tts-latest +MISTRAL_TTS_DEFAULT_VOICE=your-voice-uuid + +# OpenAI TTS (used when PODCAST_TTS_BACKEND=openai) +# Voices: alloy echo fable onyx nova shimmer +# Models: tts-1 tts-1-hd gpt-4o-mini-tts +OPENAI_API_KEY=your-openai-api-key +OPENAI_TTS_MODEL=tts-1 +OPENAI_TTS_DEFAULT_VOICE=alloy + +# Local vLLM-Omni TTS server (used when PODCAST_TTS_BACKEND=local) +# Start with: vllm serve --port 8010 +LOCAL_TTS_HOST=127.0.0.1 +LOCAL_TTS_PORT=8008 +LOCAL_TTS_MODEL=mistralai/Voxtral-4B-TTS-2603 +LOCAL_TTS_VOICE=default + +# Podcast +PODCAST_BASE_URL=http://localhost:8001 +PODCAST_SEARCH_WINDOW_DAYS=14 +PODCAST_MAX_CANDIDATES=50 +# PODCAST_NORMALIZE_AUDIO=true # uncomment to enable ffmpeg loudnorm post-processing (EBU R128, -16 LUFS) diff --git a/.gitignore b/.gitignore index aad82ec..342842f 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,9 @@ certbot/ .pytest_cache/ .coverage htmlcov/ + +# Runtime data (state files, generated audio — ephemeral) +data/ + +# Test output artifacts +.labbot-tests/ diff --git a/AGENT.md b/AGENT.md index a94b338..39628fc 100644 --- a/AGENT.md +++ b/AGENT.md @@ -32,6 +32,7 @@ All specs are in `/specs/`: - `profile-ingestion.md` — 9-step pipeline, ORCID → PubMed → PMC → LLM - `admin-dashboard.md` — read-only, server-rendered, impersonation - `agent-system.md` — Slack Bolt, Socket Mode, two-phase LLM calls, simulation engine +- `labbot-podcast.md` — daily personalized research briefing: PubMed search, LLM selection/summarization, Local or API TTS, Slack DM delivery, per-PI RSS podcast feed ## Tech Stack diff --git a/CLAUDE.md b/CLAUDE.md index 66a844b..41d34bd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -42,3 +42,50 @@ docker compose --profile agent run -d --name agent-run agent python -m src.agent ``` **Note:** The agent-run container uses mounted source code but the Python process only loads modules at startup. Code changes require a container restart to take effect. **After any code change that affects the running agent process, flag this to the user so they can decide whether to restart.** + +## Podcast Pipeline + +The LabBot Podcast pipeline (specs/labbot-podcast.md) runs daily at 9am UTC for each active agent: + +1. Build PubMed queries from lab's public profile +2. Fetch candidates from PubMed + bioRxiv + medRxiv + arXiv (last 14 days, up to 50+10 candidates) +3. Claude Sonnet selects most relevant paper (applying PI's podcast preferences from their private ProfileRevision) +4. Claude Opus writes a ~250-word structured brief +5. TTS audio generated (Mistral or local vLLM-Omni); ffmpeg loudnorm applied if PODCAST_NORMALIZE_AUDIO=true +6. Slack DM sent to PI with text summary + RSS link +7. RSS feed available at `/podcast/{agent_id}/feed.xml` +8. Audio served at `/podcast/{agent_id}/audio/{date}.mp3` + +Preprint IDs use prefixed format: `biorxiv:...`, `medrxiv:...`, `arxiv:...`. The `paper_url` in summaries links to the correct server (not always PubMed). + +```bash +# Run podcast pipeline once for all active agents +docker compose --profile podcast run --rm podcast python -m src.podcast.main + +# Test pipeline for 'su' agent only +docker compose exec app python scripts/test_podcast_su.py +``` + +## Database Migration Caveat + +If the DB was initialized from the `main` branch schema and then this branch is checked out, `alembic upgrade head` will stamp the version without re-running migrations that share a revision ID with ones already applied on `main`. Any columns added by branch-specific migrations may be silently missing. + +**Symptom:** `UndefinedColumnError` at runtime despite `alembic current` showing `head`. + +**Fix:** Check for missing columns and apply them manually: +```bash +docker compose exec app python -c " +import asyncio +from src.database import get_engine +from sqlalchemy import text + +async def check(): + eng = get_engine() + async with eng.connect() as conn: + result = await conn.execute(text(\"SELECT column_name FROM information_schema.columns WHERE table_name='researcher_profiles' ORDER BY ordinal_position\")) + print([r[0] for r in result]) + +asyncio.run(check()) +" +``` +Then add any missing columns with `ALTER TABLE ... ADD COLUMN IF NOT EXISTS ...`. diff --git a/Dockerfile b/Dockerfile index c032e95..63a7b94 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,6 +6,7 @@ WORKDIR /app RUN apt-get update && apt-get install -y --no-install-recommends \ gcc \ libpq-dev \ + ffmpeg \ && rm -rf /var/lib/apt/lists/* # Install Python dependencies diff --git a/alembic/versions/0010_add_podcast_episodes.py b/alembic/versions/0010_add_podcast_episodes.py new file mode 100644 index 0000000..adad7d2 --- /dev/null +++ b/alembic/versions/0010_add_podcast_episodes.py @@ -0,0 +1,56 @@ +"""Add podcast_episodes table + +Revision ID: 0010 +Revises: 0009 +Create Date: 2026-04-09 00:00:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from alembic import op + +revision: str = "0010" +down_revision: Union[str, None] = "0009" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "podcast_episodes", + sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("agent_id", sa.String(50), nullable=False), + sa.Column("episode_date", sa.Date, nullable=False), + sa.Column("pmid", sa.String(100), nullable=False), + sa.Column("paper_title", sa.String(500), nullable=False), + sa.Column("paper_authors", sa.String(500), nullable=False), + sa.Column("paper_journal", sa.String(255), nullable=False), + sa.Column("paper_year", sa.Integer, nullable=False), + sa.Column("text_summary", sa.Text, nullable=False), + sa.Column("audio_file_path", sa.String(500), nullable=True), + sa.Column("audio_duration_seconds", sa.Integer, nullable=True), + sa.Column("slack_delivered", sa.Boolean, nullable=False, server_default="false"), + sa.Column("selection_justification", sa.Text, nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.func.now(), + nullable=False, + ), + ) + op.create_index("ix_podcast_episodes_agent_id", "podcast_episodes", ["agent_id"]) + op.create_index("ix_podcast_episodes_episode_date", "podcast_episodes", ["episode_date"]) + op.create_unique_constraint( + "uq_podcast_agent_date", "podcast_episodes", ["agent_id", "episode_date"] + ) + + +def downgrade() -> None: + op.drop_constraint("uq_podcast_agent_date", "podcast_episodes") + op.drop_index("ix_podcast_episodes_episode_date") + op.drop_index("ix_podcast_episodes_agent_id") + op.drop_table("podcast_episodes") diff --git a/alembic/versions/0011_add_podcast_paper_url.py b/alembic/versions/0011_add_podcast_paper_url.py new file mode 100644 index 0000000..f5624dc --- /dev/null +++ b/alembic/versions/0011_add_podcast_paper_url.py @@ -0,0 +1,29 @@ +"""Add paper_url column to podcast_episodes + +Revision ID: 0011 +Revises: 0010 +Create Date: 2026-04-10 00:00:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +revision: str = "0011" +down_revision: Union[str, None] = "0010" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "podcast_episodes", + sa.Column("paper_url", sa.String(1000), nullable=True), + ) + + +def downgrade() -> None: + op.drop_column("podcast_episodes", "paper_url") diff --git a/alembic/versions/0012_add_podcast_preferences.py b/alembic/versions/0012_add_podcast_preferences.py new file mode 100644 index 0000000..bba69c7 --- /dev/null +++ b/alembic/versions/0012_add_podcast_preferences.py @@ -0,0 +1,64 @@ +"""Add podcast_preferences table + +Revision ID: 0012 +Revises: 0011 +Create Date: 2026-04-14 00:00:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql +from sqlalchemy.dialects.postgresql import ARRAY + +from alembic import op + +revision: str = "0012" +down_revision: Union[str, None] = "0011" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "podcast_preferences", + sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("agent_id", sa.String(50), nullable=False), + sa.Column("voice_id", sa.String(100), nullable=True), + sa.Column( + "extra_keywords", + ARRAY(sa.String), + nullable=False, + server_default="{}", + ), + sa.Column( + "preferred_journals", + ARRAY(sa.String), + nullable=False, + server_default="{}", + ), + sa.Column( + "deprioritized_journals", + ARRAY(sa.String), + nullable=False, + server_default="{}", + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.func.now(), + ), + ) + op.create_index( + "ix_podcast_preferences_agent_id", + "podcast_preferences", + ["agent_id"], + unique=True, + ) + + +def downgrade() -> None: + op.drop_index("ix_podcast_preferences_agent_id", table_name="podcast_preferences") + op.drop_table("podcast_preferences") diff --git a/alembic/versions/0013_podcast_user_support.py b/alembic/versions/0013_podcast_user_support.py new file mode 100644 index 0000000..89d77cd --- /dev/null +++ b/alembic/versions/0013_podcast_user_support.py @@ -0,0 +1,83 @@ +"""Extend podcast tables to support plain ORCID users (no agent required) + +Adds nullable user_id FK to podcast_preferences and podcast_episodes so that +any user who has completed onboarding can receive daily research briefings +without needing an approved AgentRegistry entry. + +Changes: + - podcast_preferences.agent_id: NOT NULL → nullable + - podcast_preferences.user_id: new nullable FK → users.id, unique index + - podcast_episodes.agent_id: NOT NULL → nullable + - podcast_episodes.user_id: new nullable FK → users.id + - podcast_episodes: partial unique index on (user_id, episode_date) WHERE user_id IS NOT NULL + +Revision ID: 0013 +Revises: 0012 +Create Date: 2026-04-14 00:00:00.000000 +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID + +from alembic import op + +revision: str = "0013" +down_revision: Union[str, None] = "0012" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # --- podcast_preferences --- + # Make agent_id nullable (existing agent rows keep their values) + op.alter_column("podcast_preferences", "agent_id", nullable=True) + + # Add user_id FK column + op.add_column( + "podcast_preferences", + sa.Column( + "user_id", + UUID(as_uuid=True), + sa.ForeignKey("users.id", ondelete="CASCADE"), + nullable=True, + ), + ) + op.create_index( + "ix_podcast_preferences_user_id", + "podcast_preferences", + ["user_id"], + unique=True, + ) + + # --- podcast_episodes --- + # Make agent_id nullable (existing agent rows keep their values) + op.alter_column("podcast_episodes", "agent_id", nullable=True) + + # Add user_id FK column + op.add_column( + "podcast_episodes", + sa.Column( + "user_id", + UUID(as_uuid=True), + sa.ForeignKey("users.id", ondelete="CASCADE"), + nullable=True, + ), + ) + # Partial unique index: one episode per user per day (only when user_id is set) + op.execute( + "CREATE UNIQUE INDEX ix_podcast_episodes_user_date " + "ON podcast_episodes (user_id, episode_date) " + "WHERE user_id IS NOT NULL" + ) + + +def downgrade() -> None: + op.execute("DROP INDEX IF EXISTS ix_podcast_episodes_user_date") + op.drop_column("podcast_episodes", "user_id") + op.alter_column("podcast_episodes", "agent_id", nullable=False) + + op.drop_index("ix_podcast_preferences_user_id", table_name="podcast_preferences") + op.drop_column("podcast_preferences", "user_id") + op.alter_column("podcast_preferences", "agent_id", nullable=False) diff --git a/code_review.md b/code_review.md new file mode 100644 index 0000000..fbf0a1d --- /dev/null +++ b/code_review.md @@ -0,0 +1,290 @@ +# Code Review: Top 5 Priority Issues + +Reviewed: 2026-04-14 +Branch: `coPI-podcast` + +--- + +## Issue 1 — CSRF Bypass on Expired OAuth Session + +**File:** `src/routers/auth.py:76-79` +**Severity:** High (security) + +### Current Code + +```python +stored_state = request.session.pop("oauth_state", None) +if stored_state and state != stored_state: + logger.warning("OAuth state mismatch") + return RedirectResponse(url="/login?error=state_mismatch", status_code=302) +``` + +### Problem + +The guard condition is `if stored_state and ...`, meaning it only enforces the check when `stored_state` is truthy. If the user's session has expired (or was never set), `stored_state` is `None` and the entire check is skipped — any `state` value (including `None`) passes through. A CSRF attacker can initiate an OAuth flow, let the victim's session expire, then replay the callback with an arbitrary code. + +### Best Practice + +Per [RFC 6749 §10.12](https://datatracker.ietf.org/doc/html/rfc6749#section-10.12) and OWASP OAuth guidelines, the `state` parameter must be treated as a **required, non-nullable nonce**. The correct pattern is to reject the callback if `stored_state` is missing (session expired), not to treat it as a pass condition. + +### How to Fix + +Change the condition from a two-branch `if stored_state and ...` guard to an explicit three-case rejection: + +```python +stored_state = request.session.pop("oauth_state", None) + +if stored_state is None: + # Session expired before the callback arrived — cannot verify CSRF nonce + logger.warning("OAuth callback with no stored state (session expired or missing)") + return RedirectResponse(url="/login?error=session_expired", status_code=302) + +if state != stored_state: + logger.warning("OAuth state mismatch — possible CSRF attempt") + return RedirectResponse(url="/login?error=state_mismatch", status_code=302) +``` + +Also ensure the state nonce is generated with sufficient entropy. In `src/routers/auth.py` (in the `/login` route that initiates the flow), use `secrets.token_urlsafe(32)` rather than any shorter or predictable token, and store it in the session immediately before the redirect. + +--- + +## Issue 2 — Budget Enforcement Exits the Entire Simulation Loop + +**File:** `src/agent/simulation.py:218-222` +**Severity:** Medium (reliability / correctness) + +### Current Code + +```python +agent = self._select_agent() +if not agent or not self._agent_within_budget(agent): + # All agents over budget + logger.info("All agents over budget or no agent selected. Stopping.") + break +``` + +### Problem + +`_select_agent()` returns whichever agent is next in the rotation. If that specific agent is over budget, the entire simulation `break`s — even if every other agent still has budget remaining. The log comment says "All agents over budget" but that is only true in the case where `_select_agent` returns `None`; when it returns an agent that is individually over budget, the others are never checked. + +### Best Practice + +Budget exhaustion for a single agent should be a **skip**, not a **halt**. The loop should continue cycling through agents until every agent is either over budget or no agent can be selected at all. A common pattern is to track how many consecutive agents have been skipped and stop only when the skip count equals the total number of agents. + +### How to Fix + +Separate the two exit conditions and convert the over-budget case from `break` to `continue`. Count consecutive over-budget skips and only exit the loop when all agents have been skipped in a single pass: + +```python +over_budget_streak = 0 +total_agents = len(self._agents) + +while True: + agent = self._select_agent() + if not agent: + logger.info("No agent selected — simulation complete.") + break + + if not self._agent_within_budget(agent): + over_budget_streak += 1 + agent.state.last_selected = time.time() + if over_budget_streak >= total_agents: + logger.info("All agents over budget. Stopping.") + break + logger.debug("[%s] Over budget, skipping.", agent.agent_id) + continue + + over_budget_streak = 0 # reset when a valid agent is found + # ... rest of the turn logic +``` + +This requires that `_select_agent` rotates through agents based on `last_selected` time (which it already does), so agents that have been skipped will be picked up again on the next cycle. + +--- + +## Issue 3 — RSS Feed Served with Missing Audio File + +**File:** `src/podcast/main.py:89-103`, `src/podcast/pipeline.py` +**Severity:** Medium (reliability) + +### Current Code + +```python +try: + ok = await run_pipeline_for_agent( + agent_id=agent_id, + ... + ) + if ok: + produced.append(agent_id) +except Exception as exc: + logger.error( + "Pipeline failed for agent %s: %s", agent_id, exc, exc_info=True + ) +``` + +### Problem + +`run_pipeline_for_agent` returns a boolean `ok`, but within the pipeline itself the episode DB record and RSS entry can be written before the TTS step completes. If TTS fails, the audio file does not exist, but the feed already contains an `` pointing to a non-existent MP3. Any podcast client that subscribed to the feed will attempt a GET on a 404 URL and may display a broken episode permanently. + +### Best Practice + +The pipeline should follow a **commit-last** pattern: write the episode record and RSS enclosure only after all assets are confirmed present on disk. This is the same pattern used in video/audio platforms (e.g., YouTube's upload pipeline) — metadata is published only after the binary asset is available. + +### How to Fix + +Inside `src/podcast/pipeline.py`, restructure the steps in this order: + +1. Fetch and select the paper (read-only, safe to do first). +2. Generate the text brief (Claude Opus call). +3. Call TTS and write the audio file to disk. **Capture the returned path.** +4. Verify the audio file exists and has a non-zero size (`path.stat().st_size > 0`) before proceeding. +5. Only if step 4 passes: write the `PodcastEpisode` DB row and call `db_session.flush()`. +6. Only after the DB row is committed: build and write the RSS ``. + +If TTS fails at step 3, log the error and return `ok=False` without writing anything to the DB or RSS. The caller in `main.py` already handles `ok=False` correctly; the gap is in the pipeline not propagating TTS failures as `False`. + +As a secondary safeguard, the RSS endpoint (`/podcast/{agent_id}/feed.xml`) should check whether `data/podcast_audio/{agent_id}/{date}.mp3` exists before including the `` element in its output. This prevents any historical DB rows with missing audio from appearing in the feed. + +--- + +## Issue 4 — Non-Atomic File Writes for Profile and Podcast State + +**Files:** `src/agent/agent.py:423-444`, `src/podcast/state.py:22-24` +**Severity:** Medium (data integrity) + +### Current Code + +```python +# agent.py +memory_path.write_text(new_memory + "\n", encoding="utf-8") + +# state.py +def _save(data: dict) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + STATE_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8") +``` + +### Problem + +`Path.write_text` is not atomic — it opens the file for truncation and writes in multiple OS-level operations. If the process crashes, is killed, or two coroutines call the write concurrently, the file can be left in a partially written state (empty, or with truncated JSON). For `podcast_state.json`, this means the `delivered_pmids` list can be lost, causing duplicate Slack DMs. For working memory files, a partial write silently discards the agent's accumulated context. + +There is also a logical race: `_save` in `state.py` does a read-modify-write cycle (`_load()` → modify → `_save()`). Two concurrent podcast pipeline runs (possible if the scheduler is invoked twice) will both read the same initial state, both modify it independently, and whichever writes last will silently overwrite the other's changes. + +### Best Practice + +The standard pattern for atomic file writes on POSIX systems is **write to a temp file, then `os.rename`**. Because `rename` is guaranteed atomic by the POSIX spec (it is a single syscall), a reader will always see either the old complete file or the new complete file — never a partial write. Python's `tempfile.NamedTemporaryFile` with `delete=False` in the same directory is the standard way to achieve this. + +For the read-modify-write race in `state.py`, use a `threading.Lock` (or `asyncio.Lock` if the callers are async) as a process-level mutex around all load/save operations. + +### How to Fix + +**Atomic write helper** (can live in `src/utils.py` or inline in each module): + +```python +import os +import tempfile +from pathlib import Path + +def atomic_write_text(path: Path, content: str, encoding: str = "utf-8") -> None: + """Write `content` to `path` atomically using a temp-file + rename.""" + path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp = tempfile.mkstemp(dir=path.parent, suffix=".tmp") + try: + with os.fdopen(fd, "w", encoding=encoding) as f: + f.write(content) + os.replace(tmp, path) # atomic on POSIX; overwrites destination + except Exception: + os.unlink(tmp) # clean up temp file on any error + raise +``` + +Replace all four `path.write_text(...)` calls in `agent.py` (lines 428 and 441) and `state.py` (line 24) with `atomic_write_text(path, content)`. + +**Lock for state.py read-modify-write:** + +```python +import threading +_STATE_LOCK = threading.Lock() + +def record_delivery(agent_id: str, pmid: str) -> None: + with _STATE_LOCK: + data = _load() + # ... modify ... + _save(data) # now uses atomic_write_text internally + +def mark_run_complete() -> None: + with _STATE_LOCK: + data = _load() + data["last_run_date"] = ... + _save(data) +``` + +**Note:** if these functions are ever called from async context across multiple event-loop threads (e.g., concurrent `run_pipeline_for_agent` calls), a `threading.Lock` is sufficient because `asyncio.run` uses a single thread per call. If concurrency is ever introduced via `asyncio.gather`, switch to `asyncio.Lock`. + +--- + +## Issue 5 — Per-Task Failures Silently Discarded in `asyncio.gather` + +**File:** `src/agent/simulation.py:632-637` +**Severity:** Low-Medium (observability / silent failure) + +### Current Code + +```python +tasks = [ + self._reply_to_thread(agent, thread) + for thread in threads_to_reply +] +await asyncio.gather(*tasks, return_exceptions=True) +``` + +### Problem + +`return_exceptions=True` causes `asyncio.gather` to return exceptions as result values instead of re-raising them. The return value here is discarded entirely, so any exceptions from individual `_reply_to_thread` calls are silently swallowed. If a Slack API error, DB write failure, or Claude API timeout occurs in any thread reply, it is invisible in logs and metrics. Operators have no signal that Phase 4 is partially or fully failing. + +### Best Practice + +When using `return_exceptions=True` the caller **must** inspect the results. The canonical pattern is to iterate the results list and log (or re-raise) any values that are `isinstance(r, BaseException)`. This is preferable to removing `return_exceptions=True` (which would cancel all remaining tasks on the first failure) because Phase 4 replies are independent — a failure on one thread should not prevent replies to others. + +### How to Fix + +Capture the return value of `asyncio.gather` and inspect each result: + +```python +results = await asyncio.gather(*tasks, return_exceptions=True) + +for thread, result in zip(threads_to_reply, results): + if isinstance(result, BaseException): + logger.error( + "[%s] Phase 4: Failed to reply to thread %s: %s", + agent.agent_id, + thread.thread_id, + result, + exc_info=result, # includes traceback in log record + ) +``` + +This pattern is appropriate anywhere `asyncio.gather(..., return_exceptions=True)` is used without inspecting results. There is a similar call site in `src/agent/simulation.py` for channel scanning — apply the same pattern there. Consider extracting a small helper: + +```python +async def gather_logged(tasks: list, label: str) -> list: + """gather with return_exceptions=True, logging each failure.""" + results = await asyncio.gather(*tasks, return_exceptions=True) + for i, r in enumerate(results): + if isinstance(r, BaseException): + logger.error("%s task[%d] failed: %s", label, i, r, exc_info=r) + return results +``` + +--- + +## Summary Table + +| # | File | Line(s) | Severity | Category | +|---|------|---------|----------|----------| +| 1 | `src/routers/auth.py` | 76-79 | High | Security — CSRF bypass | +| 2 | `src/agent/simulation.py` | 218-222 | Medium | Correctness — premature loop exit | +| 3 | `src/podcast/pipeline.py` + `main.py` | pipeline write order | Medium | Reliability — broken RSS enclosure | +| 4 | `src/agent/agent.py` + `src/podcast/state.py` | 428, 441, 22-24 | Medium | Data integrity — non-atomic writes | +| 5 | `src/agent/simulation.py` | 637 | Low-Medium | Observability — silent task failures | diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 44dc726..3c0c371 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -35,6 +35,7 @@ services: volumes: - ./profiles:/app/profiles - ./prompts:/app/prompts + - podcast_data:/app/data depends_on: postgres: condition: service_healthy @@ -83,7 +84,6 @@ services: volumes: - ./profiles:/app/profiles - ./prompts:/app/prompts - - ./data:/app/data depends_on: postgres: condition: service_healthy @@ -108,7 +108,7 @@ services: volumes: - ./profiles:/app/profiles - ./prompts:/app/prompts - - ./data:/app/data + - grantbot_data:/app/data depends_on: postgres: condition: service_healthy @@ -120,6 +120,29 @@ services: awslogs-create-group: "true" awslogs-region: ${AWS_REGION:-us-east-2} + podcast: + build: + context: . + restart: unless-stopped + command: ["python", "-m", "src.podcast.main", "scheduler", "--run-hour", "9"] + env_file: .env + environment: + DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-copi}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB:-copi} + volumes: + - ./profiles:/app/profiles + - ./prompts:/app/prompts + - podcast_data:/app/data + depends_on: + postgres: + condition: service_healthy + logging: + driver: awslogs + options: + awslogs-group: /copi/podcast + tag: podcast + awslogs-create-group: "true" + awslogs-region: ${AWS_REGION:-us-east-2} + nginx: image: nginx:1.27-alpine restart: unless-stopped @@ -167,3 +190,5 @@ services: volumes: pgdata: + grantbot_data: + podcast_data: diff --git a/docker-compose.yml b/docker-compose.yml index d686043..71d3fd9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,6 +25,7 @@ services: - .:/app - ./profiles:/app/profiles - ./prompts:/app/prompts + - ./data:/app/data depends_on: postgres: condition: service_healthy @@ -69,5 +70,25 @@ services: postgres: condition: service_healthy + podcast: + build: . + command: python -m src.podcast.main scheduler --run-hour 9 + env_file: .env + environment: + # Override LOCAL_TTS_HOST so the container can reach a vLLM-Omni server + # running on the host machine (127.0.0.1 does not reach the host from inside Docker). + LOCAL_TTS_HOST: host.docker.internal + extra_hosts: + # Ensures host.docker.internal resolves on Linux (Docker Desktop sets it automatically on Mac/Windows). + - "host.docker.internal:host-gateway" + volumes: + - .:/app + - ./profiles:/app/profiles + - ./prompts:/app/prompts + - ./data:/app/data + depends_on: + postgres: + condition: service_healthy + volumes: pgdata: diff --git a/prompts/podcast-select.md b/prompts/podcast-select.md new file mode 100644 index 0000000..121af03 --- /dev/null +++ b/prompts/podcast-select.md @@ -0,0 +1,46 @@ +You are a literature triage assistant for a scientific researcher. Your job is to identify the single most relevant and impactful recent paper from a list of candidates, based on the researcher's profile. + +## Researcher Profile + +{profile} + +## PI Podcast Preferences + +{preferences} + +## Task + +Below is a numbered list of recent publications (title + abstract). Select the ONE paper whose findings or outputs could most plausibly accelerate or inform a specific aspect of this researcher's ongoing work. + +Return your answer as JSON: +```json +{"index": , "justification": ""} +``` + +If no paper clears the relevance bar, return: +```json +{"index": null, "justification": "No paper is sufficiently relevant to this researcher's current work."} +``` + +## Selection Criteria + +**INCLUDE** a paper if: +- Its findings or methods could directly accelerate a specific ongoing project, technique, or open question in the researcher's profile +- It releases a new tool, dataset, method, or reagent relevant to the researcher's techniques or targets +- It addresses a disease area, model system, or molecular target the researcher actively works on + +**EXCLUDE** a paper if: +- The connection to the researcher's work is only superficial or generic +- It is a review article, editorial, or commentary (no new primary data) +- It is purely clinical or epidemiological with no basic science relevance +- Recency alone makes it interesting — the connection must be specific and actionable + +**NOTE:** Some candidates are preprints (from bioRxiv, medRxiv, or arXiv) and are marked as such in the journal field. Preprints are valid candidates — treat them the same as peer-reviewed papers for selection purposes. + +**PREFER** papers that release a concrete output alongside findings (code, dataset, protocol, reagent, model). These tend to be immediately useful. + +**FOLLOW PI PREFERENCES:** If the PI Podcast Preferences section above contains specific instructions (e.g., topic focus, exclusions, prioritizations), apply them when selecting. PI preferences override the general criteria above. + +## Candidate Papers + +{candidates} diff --git a/prompts/podcast-summarize.md b/prompts/podcast-summarize.md new file mode 100644 index 0000000..1a96589 --- /dev/null +++ b/prompts/podcast-summarize.md @@ -0,0 +1,46 @@ +You are a science communicator writing a personalized research brief for a specific PI. Your goal is to help the PI quickly grasp whether and how a new paper is useful to their lab. + +## Researcher Profile + +{profile} + +## PI Podcast Preferences + +{preferences} + +## Paper + +{paper} + +## Task + +Write a structured research brief following the exact format below. Be specific, direct, and concise — like a knowledgeable postdoc briefing their PI. No filler phrases, no generic connections. + +--- + +*Today's Research Brief — {date}* + +*{paper_title}* +{authors} · {journal} · {year} + +*What they found:* +[2–3 sentences on core findings. Include specific results, effect sizes, or key observations. Be concrete — name specific proteins, pathways, organisms, or quantitative outcomes where relevant.] + +*Key output:* +[1–2 sentences on the tool, method, dataset, code, protocol, or reagent released with the paper. ONLY include this section if the paper releases a concrete artifact. If there is no distinct output, omit this section entirely — do not write "N/A" or a placeholder.] + +*Why this matters for your lab:* +[2–3 sentences connecting the paper specifically to this PI's work. You MUST name at least one specific technique, model system, molecular target, or open question from the researcher's profile. Do not write generic connections like "this is relevant to your proteomics work" — say exactly what aspect and how.] + +*Link:* {paper_url} + +--- + +## Rules + +- Total length: approximately 200–280 words +- Tone: collegial and precise, not promotional +- The "Why this matters" section is the most important — make it specific to this researcher, not a general statement about the field +- If the PI Podcast Preferences section contains specific instructions on tone, focus, or framing, follow them +- If the abstract is all you have, base the brief on the abstract. Do not speculate about full-text content you weren't given. +- Do not add any text before or after the brief itself diff --git a/pyproject.toml b/pyproject.toml index d09fa83..6b780d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "boto3>=1.34.0", "typer>=0.12.0", "rich>=13.7.0", + "mutagen>=1.47.0", ] [project.optional-dependencies] diff --git a/scripts/test_podcast_su.py b/scripts/test_podcast_su.py new file mode 100644 index 0000000..600c6e6 --- /dev/null +++ b/scripts/test_podcast_su.py @@ -0,0 +1,140 @@ +"""One-shot test: run the podcast pipeline for agent 'su' only. + +Outputs: + .labbot-tests/su-summary-.txt — generated text summary + .labbot-tests/su-audio-.mp3 — TTS audio (if MISTRAL_API_KEY is set) + +Usage: + DATABASE_URL=postgresql+asyncpg://copi:copi@localhost:5432/copi \ + python scripts/test_podcast_su.py +""" + +import asyncio +import logging +import os +import shutil +from datetime import date +from pathlib import Path + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + +OUTPUT_DIR = Path(".labbot-tests") +AUDIO_DIR = Path("data/podcast_audio") + + +async def run(): + from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine + from sqlalchemy.orm import sessionmaker + + from src.config import get_settings + from src.podcast.pipeline import ( + _generate_summary, + _load_podcast_preferences, + _load_public_profile, + _parse_profile_markdown, + _select_article, + _try_fetch_full_text, + ) + from src.podcast.tts_utils import get_audio_duration_seconds + from src.podcast.pubmed_search import build_queries, fetch_candidates + from src.podcast.state import get_delivered_pmids, record_delivery + + settings = get_settings() + agent_id = "su" + today = date.today() + OUTPUT_DIR.mkdir(exist_ok=True) + + logger.info("=== LabBot Podcast test run for agent: %s ===", agent_id) + + # 1. Load profiles + profile_text = _load_public_profile(agent_id) + if not profile_text: + logger.error("No public profile found for agent: %s", agent_id) + return + logger.info("Loaded profile (%d chars)", len(profile_text)) + + preferences_text = await _load_podcast_preferences(agent_id) + if preferences_text: + logger.info("Loaded podcast preferences (%d chars)", len(preferences_text)) + else: + logger.info("No podcast preferences found for agent: %s", agent_id) + + # 2. Build queries and fetch candidates + profile_dict = _parse_profile_markdown(profile_text) + queries = build_queries(profile_dict) + logger.info("Search queries: %s", queries) + + already_delivered = get_delivered_pmids(agent_id) + logger.info("Already delivered PMIDs: %s", already_delivered) + + candidates = await fetch_candidates( + queries, + already_delivered=already_delivered, + days=settings.podcast_search_window_days, + max_total=settings.podcast_max_candidates, + ) + logger.info("Fetched %d candidates", len(candidates)) + if not candidates: + logger.error("No candidate articles found — aborting") + return + + # 3. LLM article selection + selected, justification = await _select_article(profile_text, candidates, agent_id, preferences_text) + if selected is None: + logger.error("No article selected — aborting") + return + pmid = selected.get("pmid", "") + logger.info("Selected PMID: %s", pmid) + logger.info("Justification: %s", justification) + + # 4. Fetch full text + full_text = await _try_fetch_full_text(pmid) + logger.info("Full text fetched: %s", bool(full_text)) + + # 5. Generate text summary + summary = await _generate_summary(profile_text, selected, full_text, agent_id, preferences_text) + if not summary: + logger.error("Summary generation failed — aborting") + return + + summary_path = OUTPUT_DIR / f"su-summary-{today.isoformat()}.txt" + summary_path.write_text(summary, encoding="utf-8") + logger.info("Summary written to %s", summary_path) + print("\n" + "=" * 60) + print("TEXT SUMMARY") + print("=" * 60) + print(summary) + print("=" * 60 + "\n") + + # 6. Generate audio — dispatch to backend configured by PODCAST_TTS_BACKEND + if settings.podcast_tts_backend == "local": + from src.podcast.local_tts import generate_audio + logger.info("TTS backend: local vLLM-Omni (%s:%s)", settings.local_tts_host, settings.local_tts_port) + else: + from src.podcast.mistral_tts import generate_audio + logger.info("TTS backend: Mistral AI (%s)", settings.mistral_tts_model) + + audio_src = AUDIO_DIR / agent_id / f"{today.isoformat()}.mp3" + audio_ok = await generate_audio(summary, agent_id, audio_src) + + if audio_ok: + audio_dest = OUTPUT_DIR / f"su-audio-{today.isoformat()}.mp3" + shutil.copy2(audio_src, audio_dest) + duration = get_audio_duration_seconds(audio_src) + logger.info("Audio saved to %s (duration: %ss)", audio_dest, duration) + else: + logger.warning("Audio generation failed (backend: %s)", settings.podcast_tts_backend) + + logger.info("=== Test run complete ===") + logger.info(" PMID: %s", pmid) + logger.info(" Summary: %s", summary_path) + if audio_ok: + logger.info(" Audio: %s", audio_dest) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/specs/labbot-podcast.md b/specs/labbot-podcast.md new file mode 100644 index 0000000..6ad3bc1 --- /dev/null +++ b/specs/labbot-podcast.md @@ -0,0 +1,616 @@ +# LabBot Podcast Specification + +## Overview + +LabBot Podcast is a daily personalized research briefing service for researchers. It surfaces the single most relevant and impactful recent publication from the scientific literature based on the researcher's profile, generates a structured text summary highlighting findings and tools useful to their ongoing work, and produces a short audio episode via Mistral AI TTS. Researchers can subscribe to a personal RSS podcast feed to listen to the audio. + +The system runs once per day and requires no researcher interaction to be useful — but researchers can tune it through a web UI. There are two delivery paths: + +- **Agent path** — pilot-lab PIs with an approved `AgentRegistry` entry additionally receive the text summary as a Slack DM from their lab bot. +- **User path** — any researcher who has completed ORCID onboarding and has a `ResearcherProfile` with a research summary receives the podcast automatically. No Slack bot, agent approval, or admin action required. + +--- + +## Architecture + +### Service Placement + +LabBot Podcast runs as a separate Docker container (`podcast` service), mirroring the GrantBot pattern: +- Long-running scheduler process +- Executes once per calendar day at 9am UTC (1 hour after GrantBot) +- If the container was down at the scheduled time, runs immediately on startup (catch-up) +- State persisted in `data/podcast_state.json` (tracks which articles have been delivered per agent) + +### Delivery Paths + +| Path | Who | Profile source | Delivery | Audio/RSS key | +|---|---|---|---|---| +| **Agent** | Pilot-lab PIs with active `AgentRegistry` | `profiles/public/{agent_id}.md` (disk) | Slack DM + RSS | `agent_id` string | +| **User** | Any ORCID user with completed `ResearcherProfile` | `ResearcherProfile` DB row (structured fields) | RSS only | `user_id` UUID | + +Both paths run in the same daily scheduler pass. A user who has both a `ResearcherProfile` and an active agent is handled only by the agent path (no duplicate episode). + +### Dependencies on Existing Systems + +| Existing component | How Podcast uses it | +|---|---| +| `ResearcherProfile` DB model | Source of research areas, keywords, techniques, disease areas for the user path | +| `profiles/public/{lab}.md` | Profile text for the agent path (LLM article selection and summary) | +| `src/services/pubmed.py` | Literature search (keyword + MeSH queries) | +| `src/services/llm.py` | Article selection ranking and summary generation (all calls logged to `LlmCallLog`) | +| `AgentRegistry` | Maps agent → PI → Slack bot token for DM delivery (agent path only) | +| `User.id` (UUID) | Stable, opaque RSS feed token for the user path | +| Slack bot DM | Text summary delivery (agent path only) | + +### New External Dependency + +**Mistral AI API** — text-to-speech generation. +- Configured via `MISTRAL_API_KEY` environment variable +- Voice selection per agent configured in `data/podcast_voices.json` (agent_id → voice_id); falls back to a default voice if not set +- Audio files stored at `data/podcast_audio/{agent_id}/{YYYY-MM-DD}.mp3` + +--- + +## Daily Pipeline + +Each day the scheduler runs two loops in sequence: + +1. **Agent loop** — iterates over all active `AgentRegistry` entries and calls `run_pipeline_for_agent()` for each. +2. **User loop** — iterates over all `User` rows where `onboarding_complete=True` and `profile.research_summary IS NOT NULL`, skipping any whose `user_id` appeared in the agent loop, and calls `run_podcast_for_user()` for each. + +For each recipient, the pipeline executes the following steps sequentially: + +### Step 1: Load Profile + +- **Agent path**: read `profiles/public/{agent_id}.md` from disk. If absent, skip. +- **User path**: construct profile text from structured `ResearcherProfile` DB fields (`research_summary`, `disease_areas`, `techniques`, `experimental_models`, `keywords`). If `research_summary` is empty, skip. + +### Step 2: Build Search Queries + +Construct PubMed search terms from the profile: +- Extract top research area keywords +- Extract technique and experimental model terms +- Combine into 2–3 PubMed query strings (e.g., `(proteostasis OR unfolded protein response) AND (neurodegeneration OR proteomics)`) +- Inject any `extra_keywords` from `PodcastPreferences` as additional quoted terms +- Limit to publications from the last 14 days (rolling window ensures coverage across weekend/holiday gaps) +- Cap at 50 candidate abstracts + +### Step 3: Fetch Candidate Abstracts + +Use `src/services/pubmed.py` to execute each query and retrieve PMIDs + abstracts. Deduplicate across queries. Skip any PMID already in `podcast_state.json` for this recipient (agent or user) to prevent re-delivering the same article. + +### Step 4: LLM Article Selection (Sonnet) + +Single LLM call (Sonnet) with: +- The researcher's full profile text (disk for agent path; constructed from DB for user path) +- The list of candidate abstracts (title + abstract text, numbered) +- Any journal preferences from `PodcastPreferences` +- Prompt: `prompts/podcast-select.md` + +The LLM returns the index of the single best article, along with a one-sentence justification of why it is relevant to this researcher's ongoing work. If no article meets a minimum relevance threshold, it returns `null` and the pipeline skips delivery today. + +### Step 5: Generate Text Summary (Opus) + +One LLM call (Opus) with: +- The researcher's full profile text +- The selected article's title, abstract, and full text (fetched via `retrieve_full_text` if available in PMC, otherwise abstract only) +- Prompt: `prompts/podcast-summarize.md` + +Output is a structured text summary (see format below). This is used as the TTS input and stored in `PodcastEpisode.text_summary`. + +### Step 6: Generate Audio (Mistral AI) + +Pass the text summary to the Mistral AI TTS API: +- Voice: from `PodcastPreferences.voice_id`, or `MISTRAL_TTS_DEFAULT_VOICE` +- Model: configurable via `MISTRAL_TTS_MODEL` +- Output: MP3 file saved to: + - Agent path: `data/podcast_audio/{agent_id}/{YYYY-MM-DD}.mp3` + - User path: `data/podcast_audio/users/{user_id}/{YYYY-MM-DD}.mp3` +- If TTS fails, the episode DB row is **not** written (see commit-last ordering); the run returns `False`. + +### Step 7: Deliver via Slack DM _(agent path only)_ + +Send the text summary as a DM from the agent's Slack bot to its PI, appending the RSS feed URL. User-path episodes are delivered via RSS only — no Slack bot is required. + +### Step 8: Persist Episode and Update State + +1. Write the `PodcastEpisode` row to the DB: + - Agent path: `agent_id` set, `user_id` NULL + - User path: `user_id` set, `agent_id` NULL +2. Append the delivered PMID to `data/podcast_state.json` (keyed by `agent_id` or `user_id`) to prevent re-delivery. + +--- + +## Text Summary Format + +The Opus-generated summary follows a consistent structure. The prompt enforces this layout: + +``` +*Today's Research Brief — {Date}* + +*{Paper Title}* +{Authors} · {Journal} · {Year} + +*What they found:* +2–3 sentences on the core findings — specific results, effect sizes, or observations. + +*Key output:* +1–2 sentences on any tool, method, dataset, or reagent released with the paper (if applicable). Omit this section if the paper has no distinct output. + +*Why this matters for your lab:* +2–3 sentences connecting the paper's findings and outputs specifically to the PI's ongoing research areas, techniques, or open questions. Ground this in the PI's profile — name specific techniques, model systems, or questions from their work. + +*PubMed:* https://pubmed.ncbi.nlm.nih.gov/{PMID}/ +``` + +The Slack DM appends a line at the bottom: +> _Listen to the audio version: {rss_feed_url}_ + +--- + +## RSS Podcast Feed + +### Endpoints + +| Path | Auth | Key | +|---|---|---| +| `GET /podcast/{agent_id}/feed.xml` | None | Pilot-lab agent | +| `GET /podcast/{agent_id}/audio/{date}.mp3` | None | Pilot-lab agent | +| `GET /podcast/users/{user_id}/feed.xml` | None | Plain ORCID user | +| `GET /podcast/users/{user_id}/audio/{date}.mp3` | None | Plain ORCID user | + +All four endpoints are public and unauthenticated. The `user_id` UUID is opaque and acts as a stable, subscribable feed token — equivalent to a private podcast URL. Users retrieve their feed URL from the `/podcast/settings` page. + +### Feed Structure + +Standard RSS 2.0 with iTunes podcast extensions (identical structure for both paths): + +```xml + + + {Name} — LabBot Research Briefings + Daily personalized research summaries for {Name}. + {feed_url} + {Name} + + + {Paper Title} — {Date} + {text summary} + + {RFC 822 date} + {agent_id|user-{user_id}}-{YYYY-MM-DD} + {duration} + + ... + + +``` + +### Audio File Storage + +| Path | Audio directory | +|---|---| +| Agent path | `data/podcast_audio/{agent_id}/{YYYY-MM-DD}.mp3` | +| User path | `data/podcast_audio/users/{user_id}/{YYYY-MM-DD}.mp3` | + +Files are streamed with `Content-Type: audio/mpeg`. + +--- + +## LLM Prompt Files + +Two new prompt files in `prompts/`: + +### `prompts/podcast-select.md` + +Instructs the LLM to act as a literature triage assistant for a specific PI. It receives: +- The PI's public profile (research areas, techniques, open questions, unique capabilities) +- Numbered list of candidate abstracts (title + abstract) + +It must return: +- The number of the most relevant article, or `null` if none clears the relevance bar +- A one-sentence justification referencing a specific aspect of the PI's profile + +Key instructions in the prompt: +- Relevance is defined as: the paper's findings or outputs could plausibly accelerate or inform a specific aspect of the PI's ongoing work +- Recency alone is not sufficient — the connection must be specific +- Prefer papers that release a tool, method, dataset, or reagent alongside findings +- Do not pick review articles or editorials + +### `prompts/podcast-summarize.md` + +Instructs the LLM to act as a science communicator writing for a specific PI. It receives: +- The PI's public profile +- Full paper text (or abstract if full text unavailable) + +It must produce the structured summary described above. Key instructions: +- The "Why this matters for your lab" section must name specific techniques, model systems, or open questions from the PI's profile — no generic connections +- Tone is like a knowledgeable postdoc briefing their PI: specific, direct, no filler +- The "Key output" section is only included if the paper releases a concrete artifact (tool, code, dataset, method, reagent); skip it otherwise +- Target length: ~250 words total + +--- + +## Data Model + +### `PodcastEpisode` + +Rows are keyed by either `agent_id` (string) or `user_id` (UUID FK to `users.id`). Exactly one should be set per row. + +```python +class PodcastEpisode(Base): + __tablename__ = "podcast_episodes" + + id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4) + agent_id: Mapped[str | None] = mapped_column(String(50), nullable=True, index=True) + user_id: Mapped[uuid.UUID | None] = mapped_column(UUID, ForeignKey("users.id"), nullable=True, index=True) + episode_date: Mapped[date] = mapped_column(Date, nullable=False) + pmid: Mapped[str] = mapped_column(String(100), nullable=False) + paper_title: Mapped[str] = mapped_column(String(500), nullable=False) + paper_authors: Mapped[str] = mapped_column(String(500), nullable=False) + paper_journal: Mapped[str] = mapped_column(String(255), nullable=False) + paper_year: Mapped[int] = mapped_column(Integer, nullable=False) + paper_url: Mapped[str | None] = mapped_column(String(1000), nullable=True) + text_summary: Mapped[str] = mapped_column(Text, nullable=False) + audio_file_path: Mapped[str | None] = mapped_column(String(500), nullable=True) + audio_duration_seconds: Mapped[int | None] = mapped_column(Integer, nullable=True) + slack_delivered: Mapped[bool] = mapped_column(Boolean, default=False) + selection_justification: Mapped[str] = mapped_column(Text, nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + + __table_args__ = ( + # Agent-path: one episode per agent per day + UniqueConstraint("agent_id", "episode_date", name="uq_podcast_agent_date"), + # User-path: enforced by partial unique index (migration 0013): + # CREATE UNIQUE INDEX ix_podcast_episodes_user_date + # ON podcast_episodes (user_id, episode_date) WHERE user_id IS NOT NULL + ) +``` + +### `PodcastPreferences` + +Rows are keyed by either `agent_id` or `user_id`. Both columns are nullable and uniquely indexed. + +```python +class PodcastPreferences(Base): + __tablename__ = "podcast_preferences" + + id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4) + agent_id: Mapped[str | None] = mapped_column(String(50), nullable=True, unique=True, index=True) + user_id: Mapped[uuid.UUID | None] = mapped_column(UUID, ForeignKey("users.id"), nullable=True, unique=True, index=True) + voice_id: Mapped[str | None] = mapped_column(String(100), nullable=True) + extra_keywords: Mapped[list[str]] = mapped_column(ARRAY(String), server_default="{}") + preferred_journals: Mapped[list[str]] = mapped_column(ARRAY(String), server_default="{}") + deprioritized_journals: Mapped[list[str]] = mapped_column(ARRAY(String), server_default="{}") + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) +``` + +### State File (`data/podcast_state.json`) + +Keyed separately for agents and users: + +```json +{ + "agents": { + "": { "delivered_pmids": ["12345", "67890"] } + }, + "users": { + "": { "delivered_pmids": ["11111"] } + }, + "last_run_date": "2026-04-14" +} +``` + +The state file is a lightweight deduplication cache. The DB is the authoritative record for RSS generation and admin visibility. + +### Alembic Migrations + +| Migration | Creates / alters | +|---|---| +| `0010_add_podcast_episodes.py` | `podcast_episodes` table (agent path) | +| `0011_add_podcast_paper_url.py` | `paper_url` column | +| `0012_add_podcast_preferences.py` | `podcast_preferences` table (agent path) | +| `0013_podcast_user_support.py` | `user_id` FK on both tables; make `agent_id` nullable; partial unique index for user-path episodes | + +--- + +## Configuration + +New environment variables: + +| Variable | Required | Description | +|---|---|---| +| `MISTRAL_API_KEY` | Yes (for audio) | Mistral AI API key | +| `MISTRAL_TTS_MODEL` | No | TTS model ID (default: `mistral-tts-latest`) | +| `MISTRAL_TTS_DEFAULT_VOICE` | No | Default voice when no per-agent override exists | +| `PODCAST_BASE_URL` | Yes | Public base URL for RSS enclosure links (e.g., `https://copi.science`) | +| `PODCAST_SEARCH_WINDOW_DAYS` | No | Rolling search window in days (default: `14`) | +| `PODCAST_MAX_CANDIDATES` | No | Max PubMed abstracts per agent per day (default: `50`) | + +Per-agent voice overrides (Phase 2/3): `data/podcast_voices.json` +```json +{ + "su": "alex", + "wiseman": "stella" +} +``` +**Deprecated in Phase 4** — voice preferences move to the `podcast_preferences` DB table. The JSON file is still read as a fallback while the migration is in progress. + +--- + +## Docker Service + +Add `podcast` service to `docker-compose.yml` and `docker-compose.prod.yml`: + +```yaml +podcast: + build: . + command: python -m src.podcast.main + env_file: .env + volumes: + - ./data:/app/data + depends_on: + - postgres + profiles: + - podcast +``` + +Run with: `docker compose --profile podcast up -d podcast` + +--- + +## Module Structure + +``` +src/podcast/ +├── main.py # Scheduler entry point (APScheduler, same pattern as grantbot.py) +├── pipeline.py # Per-agent pipeline (steps 1–8 above) +├── pubmed_search.py # Query builder from ResearcherProfile +├── mistral_tts.py # Mistral AI TTS client wrapper +├── rss.py # RSS feed builder (reads from DB) +└── state.py # podcast_state.json read/write helpers + +src/routers/podcast.py # FastAPI routes: /podcast/{agent_id}/feed.xml, /podcast/{agent_id}/audio/{date}.mp3 +``` + +The scheduler in `src/podcast/main.py` follows the same catch-up-on-startup pattern as `src/agent/grantbot.py`: +1. On startup, check `data/podcast_state.json` for last run timestamp +2. If last run was before today's 9am UTC, run immediately +3. Schedule next run at 9am UTC + +--- + +## Admin Dashboard Integration + +Add a **Podcast** tab to the existing admin dashboard (`src/routers/admin.py` + `templates/admin.html`) showing: +- Table of recent episodes: agent, date, paper title, PMID, Slack delivered (yes/no), audio generated (yes/no) +- Link to each agent's RSS feed +- LLM call counts and token usage for the podcast pipeline (pulled from `LlmCallLog` filtered by `source = "podcast"`) + +The LLM calls from the podcast pipeline should set a `source` tag in `LlmCallLog` (add a `source` column via migration if not already present, or use the existing `extra_metadata` JSONB field). + +--- + +## PI Customization + +### Via Standing Instructions (Current) + +PIs can adjust podcast behavior through standing instructions to their lab bot (same DM mechanism as the agent system — see `pi-interaction.md`). The podcast pipeline reads the private profile when building the selection prompt. + +Examples of effective standing instructions: +- "For my daily podcast, focus only on papers that release a new tool or dataset — I don't need summaries of pure wet-lab findings" +- "Prioritize papers from computational biology journals for the podcast" +- "Skip anything about C. elegans — we're not pursuing that direction anymore" + +The bot's private profile rewrite (via `prompts/pi-profile-rewrite.md`) should include a `## Podcast Preferences` section that the podcast pipeline reads when constructing the selection and summarization prompts. + +### Via Preferences UI (Phase 4) + +A structured preferences page at `/agent/{agent_id}/podcast-settings` replaces the `data/podcast_voices.json` file and augments the standing-instructions mechanism with three explicit controls: + +1. **Voice** — select the TTS voice used for audio generation +2. **Extra search keywords** — additional terms appended to PubMed/preprint queries beyond the auto-extracted profile keywords +3. **Source preferences** — journals or preprint servers to prioritize (boosted in the selection prompt) or deprioritize + +See the **Podcast Preferences UI** section below for the full design. + +--- + +## Podcast Preferences UI + +### Route and Access Control + +| Route | Method | Handler | Access | Notes | +|---|---|---|---|---| +| `/agent/{agent_id}/podcast-settings` | `GET` | Render agent preferences form | Agent owner or admin | Agent path | +| `/agent/{agent_id}/podcast-settings` | `POST` | Save agent preferences | Agent owner or admin | Agent path | +| `/podcast/settings` | `GET` | Render user preferences form | Any authenticated user with completed profile | User path | +| `/podcast/settings` | `POST` | Save user preferences | Any authenticated user with completed profile | User path | +| `/podcast/user/generate` | `POST` | Trigger on-demand episode | Any authenticated user with completed profile | User path | + +The agent-path routes remain in `src/routers/agent_page.py` with the same `get_agent_with_access()` ownership check. The user-path routes live in `src/routers/podcast.py` and use `get_current_user()` + a profile-completeness check (`onboarding_complete=True` and `profile.research_summary IS NOT NULL`). + +### User Feed URL + +After saving preferences or visiting `/podcast/settings`, the user sees their personal feed URL: + +``` +{PODCAST_BASE_URL}/podcast/users/{user.id}/feed.xml +``` + +This URL: +- Requires no authentication to read (subscribe in any podcast app) +- Is stable for the lifetime of the user account +- Acts as an opaque token — not guessable, not secret, but not publicly listed +- Is displayed with a one-click copy button on the settings page + +### Form Fields + +#### 1. Voice Selection + +A ` +

+ {% if prefs and prefs.extra_keywords %}{{ prefs.extra_keywords | length }} keyword(s) saved.{% else %}No extra keywords set.{% endif %} +

+ + + +
+

Source Preferences

+

+ Guide the article selection by telling the AI which journals or preprint servers + to prioritize or avoid. One source per line (or comma-separated). +

+ +
+ + +
+ +
+ + +
+
+ +
+ + + Cancel + +
+

+ Changes take effect on the next scheduled podcast run (daily at 9am UTC). +

+ + +{% endblock %} diff --git a/templates/base.html b/templates/base.html index 74db818..70af5f4 100644 --- a/templates/base.html +++ b/templates/base.html @@ -86,6 +86,7 @@ Activity Discussions Agents + Podcast Access Waitlist diff --git a/templates/podcast_settings.html b/templates/podcast_settings.html new file mode 100644 index 0000000..59f8a1b --- /dev/null +++ b/templates/podcast_settings.html @@ -0,0 +1,155 @@ +{% extends "base.html" %} +{% block title %}Podcast Settings — CoPI{% endblock %} + +{% block content %} +
+
+
+ ← My Profile +

Podcast Settings

+

Customize your daily LabBot research briefing

+
+
+ + {% if saved %} +
+ Preferences saved successfully. +
+ {% endif %} + + +
+

Your Podcast Feed URL

+

+ Subscribe to this URL in any podcast app (Apple Podcasts, Overcast, Pocket Casts, etc.) + to receive audio episodes automatically. The URL is stable and does not require login. +

+
+ + +
+

+ New episodes are generated daily at 9am UTC. You can also + . +

+ +
+ +
+ + +
+

Voice

+

+ Select the text-to-speech voice used for your audio episodes. + Voices are from Mistral AI's voxtral-mini-tts-latest model. +

+ +
+ + +
+

Extra Search Keywords

+

+ Additional terms to include in the daily literature search, beyond what is + auto-extracted from your profile. One keyword or phrase per line (max 20). + These are added as quoted PubMed search terms. +

+ +

+ {% if prefs and prefs.extra_keywords %}{{ prefs.extra_keywords | length }} keyword(s) saved.{% else %}No extra keywords set.{% endif %} +

+
+ + +
+

Source Preferences

+

+ Guide the article selection by telling the AI which journals or preprint servers + to prioritize or avoid. One source per line (or comma-separated). +

+ +
+ + +
+ +
+ + +
+
+ +
+ + + Cancel + +
+

+ Changes take effect on the next scheduled podcast run (daily at 9am UTC). +

+
+
+{% endblock %} + +{% block scripts %} + +{% endblock %} diff --git a/tests/test_podcast.py b/tests/test_podcast.py new file mode 100644 index 0000000..76e6138 --- /dev/null +++ b/tests/test_podcast.py @@ -0,0 +1,343 @@ +"""Unit tests for podcast pipeline pure-logic functions and RSS builder.""" + +import json +import os +import tempfile +from datetime import date +from pathlib import Path +from types import SimpleNamespace + +import pytest + +from src.podcast.pubmed_search import build_queries +from src.podcast.pipeline import ( + _format_candidates_for_prompt, + _extract_section_text, + _build_profile_text_from_db, +) +from src.podcast.rss import build_feed +from src.podcast.state import ( + get_delivered_pmids, + record_delivery, + get_delivered_pmids_for_user, + record_delivery_for_user, +) + + +# --------------------------------------------------------------------------- +# build_queries +# --------------------------------------------------------------------------- + +class TestBuildQueries: + def test_disease_areas_produce_query(self): + profile = {"disease_areas": ["neurodegeneration", "Alzheimer's disease"], "techniques": [], "experimental_models": [], "keywords": []} + queries = build_queries(profile) + assert len(queries) >= 1 + assert "neurodegeneration" in queries[0] + + def test_techniques_produce_second_query(self): + profile = { + "disease_areas": ["cancer"], + "techniques": ["CRISPR", "flow cytometry"], + "experimental_models": [], + "keywords": [], + } + queries = build_queries(profile) + assert len(queries) >= 2 + assert any("CRISPR" in q for q in queries) + + def test_keywords_produce_third_query(self): + profile = { + "disease_areas": ["diabetes"], + "techniques": ["proteomics"], + "experimental_models": [], + "keywords": ["insulin signaling", "beta cell"], + } + queries = build_queries(profile) + assert len(queries) >= 3 + assert any("insulin signaling" in q or "beta cell" in q for q in queries) + + def test_empty_profile_returns_empty(self): + queries = build_queries({}) + assert queries == [] + + def test_fallback_to_research_summary(self): + profile = {"research_summary": "Studying ribosome biogenesis mechanisms"} + queries = build_queries(profile) + assert len(queries) == 1 + + def test_queries_are_quoted_terms(self): + profile = {"disease_areas": ["proteostasis"], "techniques": [], "experimental_models": [], "keywords": []} + queries = build_queries(profile) + assert '"proteostasis"' in queries[0] + + +# --------------------------------------------------------------------------- +# _format_candidates_for_prompt +# --------------------------------------------------------------------------- + +class TestFormatCandidates: + def test_numbers_candidates_from_one(self): + records = [ + {"title": "Paper A", "abstract": "Abstract A", "journal": "Nature", "year": 2024}, + {"title": "Paper B", "abstract": "Abstract B", "journal": "Science", "year": 2024}, + ] + text = _format_candidates_for_prompt(records) + assert text.startswith("1.") + assert "2." in text + + def test_includes_title_and_abstract(self): + records = [{"title": "CRISPR therapy", "abstract": "We developed a new approach.", "journal": "Cell", "year": 2025}] + text = _format_candidates_for_prompt(records) + assert "CRISPR therapy" in text + assert "We developed a new approach." in text + + def test_truncates_long_abstract(self): + long_abstract = "x" * 1000 + records = [{"title": "T", "abstract": long_abstract, "journal": "J", "year": 2024}] + text = _format_candidates_for_prompt(records) + assert len(text) < 1000 # abstract truncated to 600 chars + + def test_handles_missing_fields(self): + records = [{"title": "Minimal record"}] + text = _format_candidates_for_prompt(records) + assert "Minimal record" in text + assert "No abstract" in text + + +# --------------------------------------------------------------------------- +# _extract_section_text +# --------------------------------------------------------------------------- + +class TestExtractSectionText: + SAMPLE_MD = """## Research Summary +We study protein folding in neurons. + +## Key Methods and Technologies +- Cryo-EM +- Mass spectrometry + +## Podcast Preferences +Focus on computational tools only. +""" + + def test_extracts_research_summary(self): + text = _extract_section_text(self.SAMPLE_MD, "Research Summary") + assert "protein folding" in text + + def test_extracts_podcast_preferences(self): + text = _extract_section_text(self.SAMPLE_MD, "Podcast Preferences") + assert "computational tools" in text + + def test_stops_at_next_section(self): + text = _extract_section_text(self.SAMPLE_MD, "Research Summary") + assert "Cryo-EM" not in text + + def test_missing_section_returns_empty(self): + text = _extract_section_text(self.SAMPLE_MD, "Nonexistent Section") + assert text == "" + + +# --------------------------------------------------------------------------- +# RSS feed builder +# --------------------------------------------------------------------------- + +def _make_episode(**kwargs): + """Create a minimal PodcastEpisode-like object for RSS tests.""" + defaults = dict( + episode_date=date(2026, 4, 10), + paper_title="A Great Paper", + paper_authors="Smith J et al.", + paper_journal="Nature", + paper_year=2026, + pmid="12345678", + paper_url=None, + text_summary="This paper found something important.", + audio_file_path=None, + audio_duration_seconds=None, + slack_delivered=True, + selection_justification="Highly relevant to the PI's work.", + ) + defaults.update(kwargs) + return SimpleNamespace(**defaults) + + +class TestBuildFeed: + # --- agent path --- + + def test_returns_valid_xml_root(self): + xml = build_feed("Jane Smith", [], "https://example.com", agent_id="testagent") + assert xml.startswith("1:30" in xml + + def test_no_enclosure_when_no_audio(self): + ep = _make_episode(audio_file_path=None) + xml = build_feed("Jane Smith", [ep], "https://example.com", agent_id="testagent") + assert "") + xml = build_feed("Jane Smith", [ep], "https://example.com", agent_id="testagent") + assert "Proteins & <Stuff>" in xml + + def test_empty_episodes_list(self): + xml = build_feed("Jane Smith", [], "https://example.com", agent_id="testagent") + assert "" not in xml + + def test_agent_guid_format(self): + ep = _make_episode() + xml = build_feed("Jane Smith", [ep], "https://example.com", agent_id="testagent") + assert "testagent-2026-04-10" in xml + + # --- user path --- + + def test_user_feed_url_uses_user_id(self): + uid = "11111111-2222-3333-4444-555555555555" + xml = build_feed("Alice Brown", [], "https://example.com", user_id=uid) + assert f"/podcast/users/{uid}/feed.xml" in xml + + def test_user_feed_has_correct_pi_name(self): + uid = "11111111-2222-3333-4444-555555555555" + xml = build_feed("Alice Brown", [], "https://example.com", user_id=uid) + assert "Alice Brown" in xml + + def test_user_audio_url_uses_user_path(self, tmp_path): + uid = "11111111-2222-3333-4444-555555555555" + audio_file = tmp_path / "2026-04-10.mp3" + audio_file.write_bytes(b"\x00" * 500) + ep = _make_episode(audio_file_path=str(audio_file)) + xml = build_feed("Alice Brown", [ep], "https://example.com", user_id=uid) + assert f"/podcast/users/{uid}/audio/2026-04-10.mp3" in xml + + def test_user_guid_format(self): + uid = "11111111-2222-3333-4444-555555555555" + ep = _make_episode() + xml = build_feed("Alice Brown", [ep], "https://example.com", user_id=uid) + assert f"user-{uid}-2026-04-10" in xml + + +# --------------------------------------------------------------------------- +# State helpers — user path +# --------------------------------------------------------------------------- + +class TestUserState: + def test_new_user_has_empty_delivered_set(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.podcast.state.STATE_FILE", tmp_path / "state.json") + result = get_delivered_pmids_for_user("user-uuid-abc") + assert result == set() + + def test_record_and_retrieve_user_delivery(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.podcast.state.STATE_FILE", tmp_path / "state.json") + record_delivery_for_user("user-uuid-abc", "12345") + record_delivery_for_user("user-uuid-abc", "67890") + result = get_delivered_pmids_for_user("user-uuid-abc") + assert result == {"12345", "67890"} + + def test_user_and_agent_state_are_independent(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.podcast.state.STATE_FILE", tmp_path / "state.json") + record_delivery("myagent", "11111") + record_delivery_for_user("user-uuid-abc", "22222") + assert get_delivered_pmids("myagent") == {"11111"} + assert get_delivered_pmids_for_user("user-uuid-abc") == {"22222"} + # no cross-contamination + assert "22222" not in get_delivered_pmids("myagent") + assert "11111" not in get_delivered_pmids_for_user("user-uuid-abc") + + def test_duplicate_pmid_not_added_twice(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.podcast.state.STATE_FILE", tmp_path / "state.json") + record_delivery_for_user("user-uuid-abc", "99999") + record_delivery_for_user("user-uuid-abc", "99999") + raw = json.loads((tmp_path / "state.json").read_text()) + assert raw["users"]["user-uuid-abc"]["delivered_pmids"].count("99999") == 1 + + def test_atomic_write_leaves_valid_json(self, tmp_path, monkeypatch): + monkeypatch.setattr("src.podcast.state.STATE_FILE", tmp_path / "state.json") + record_delivery_for_user("u1", "aaa") + content = (tmp_path / "state.json").read_text() + parsed = json.loads(content) # must be valid JSON + assert "users" in parsed + + +# --------------------------------------------------------------------------- +# _build_profile_text_from_db +# --------------------------------------------------------------------------- + +class TestBuildProfileTextFromDb: + def _make_user(self, **kwargs): + defaults = dict(name="Dr. Alice", institution="MIT", department="Biology") + defaults.update(kwargs) + return SimpleNamespace(**defaults) + + def _make_profile(self, **kwargs): + defaults = dict( + research_summary="We study protein aggregation.", + disease_areas=["Alzheimer's", "Parkinson's"], + techniques=["cryo-EM", "mass spectrometry"], + experimental_models=["mouse", "iPSC"], + keywords=["proteostasis", "neurodegeneration"], + ) + defaults.update(kwargs) + return SimpleNamespace(**defaults) + + def test_includes_user_name(self): + text = _build_profile_text_from_db(self._make_user(), self._make_profile()) + assert "Dr. Alice" in text + + def test_includes_research_summary(self): + text = _build_profile_text_from_db(self._make_user(), self._make_profile()) + assert "protein aggregation" in text + + def test_includes_disease_areas(self): + text = _build_profile_text_from_db(self._make_user(), self._make_profile()) + assert "Alzheimer" in text + + def test_includes_techniques(self): + text = _build_profile_text_from_db(self._make_user(), self._make_profile()) + assert "cryo-EM" in text + + def test_handles_none_fields_gracefully(self): + profile = self._make_profile(disease_areas=None, techniques=None, keywords=None) + text = _build_profile_text_from_db(self._make_user(), profile) + assert "protein aggregation" in text # summary still present + + def test_handles_missing_institution(self): + user = self._make_user(institution=None, department=None) + text = _build_profile_text_from_db(user, self._make_profile()) + assert "Dr. Alice" in text