diff --git a/docs/IMPLEMENTATION.md b/docs/IMPLEMENTATION.md index 8fd1dfd..406bbf0 100644 --- a/docs/IMPLEMENTATION.md +++ b/docs/IMPLEMENTATION.md @@ -159,7 +159,10 @@ def load_config() -> AppConfig: ... """Load QSettings + DPAPI. Unknown keys get field defaults.""" def save_config(cfg: AppConfig) -> None: ... - """Persist to QSettings + DPAPI. api_key fields go through DPAPI.""" + """Persist to QSettings + DPAPI. api_key and custom-header fields go through DPAPI.""" + +def has_plaintext_secrets() -> bool: ... + """True if any secret field still sits as plaintext in settings.ini.""" def reset_config() -> AppConfig: ... """Fresh AppConfig with all defaults. Does not write disk.""" @@ -282,7 +285,7 @@ class RecordingSnackbar(QWidget): ```python class PasswordField(QLineEdit): - """Password line edit that reveals text only while focused.""" + """Masked line edit with an explicit trailing show/hide toggle action.""" class SettingsDialog(QDialog): def __init__( @@ -477,5 +480,5 @@ Do not proceed to Phase 2 until review passes. - No modules beyond the 11 listed. No new dependencies. - Do not implement packaging (PyInstaller), code signing, or cross-platform hotkey backends. - Autostart registration is implemented in `startup.py`. -- All paths: `%LOCALAPPDATA%/Screamer/`. API keys: DPAPI. Plain settings: QSettings (IniFormat). +- All paths: `%LOCALAPPDATA%/Screamer/`. API keys + custom headers: DPAPI. Plain settings: QSettings (IniFormat). - If a Phase 2 bug forces a Phase 1 API change, document it in the review checkpoint and get re-approval. diff --git a/src/config.py b/src/config.py index 3c947f3..1546892 100644 --- a/src/config.py +++ b/src/config.py @@ -6,6 +6,7 @@ import logging import os import platform +import sys from dataclasses import dataclass, field, fields from logging.handlers import RotatingFileHandler from urllib.parse import urlsplit @@ -304,12 +305,17 @@ def llm_fallback_provider(self) -> FallbackProviderConfig: ) -# Fields that contain secret API keys and must go through DPAPI. +# Fields that contain secrets and must go through DPAPI: API keys, plus custom +# headers (which routinely carry tokens such as X-Api-Key). _SECRET_FIELDS = frozenset({ "stt_api_key", "stt_fallback_api_key", "llm_api_key", "llm_fallback_api_key", + "stt_custom_headers", + "stt_fallback_custom_headers", + "llm_custom_headers", + "llm_fallback_custom_headers", }) # DPAPI entropy string bound to this application. @@ -482,12 +488,25 @@ def save_config(cfg: AppConfig) -> None: settings = _get_qsettings() for f in fields(AppConfig): if f.name in _SECRET_FIELDS: + # Purge any plaintext value an older version left in the ini, so it + # cannot shadow the DPAPI-stored value on the next load. + settings.remove(f.name) continue settings.setValue(f.name, getattr(cfg, f.name)) settings.sync() _save_secrets(cfg) +def has_plaintext_secrets() -> bool: + """True if any secret field still sits as plaintext in settings.ini. + + Older versions wrote custom headers to the ini; save_config purges them, + but the purge only happens on save — callers use this to force one. + """ + settings = _get_qsettings() + return any(settings.contains(name) for name in _SECRET_FIELDS) + + def reset_config() -> AppConfig: """Fresh AppConfig with all defaults. Does not write disk.""" return AppConfig() @@ -560,15 +579,23 @@ def validate_config(cfg: AppConfig) -> list[ConfigValidationIssue]: return issues +def _env_path() -> str: + """Locate .env: next to the executable when frozen, else at cwd (dev runs).""" + if getattr(sys, "frozen", False): + return os.path.join(os.path.dirname(sys.executable), ".env") + return os.path.join(os.getcwd(), ".env") + + def import_from_env(cfg: AppConfig) -> AppConfig: - """Read .env at cwd; backfill ONLY empty str fields. No-op if no .env file.""" + """Read .env (exe dir when frozen, else cwd); backfill ONLY empty str fields. + No-op if no .env file.""" try: from dotenv import dotenv_values except ImportError: log.debug("python-dotenv not installed; skipping .env import") return cfg - env_path = os.path.join(os.getcwd(), ".env") + env_path = _env_path() if not os.path.exists(env_path): return cfg diff --git a/src/injector.py b/src/injector.py index be6e190..89d566a 100644 --- a/src/injector.py +++ b/src/injector.py @@ -19,6 +19,12 @@ } +def _utf16_units(value: str) -> list[str]: + """Split *value* into UTF-16 code units (surrogate pairs become two units).""" + encoded = value.encode("utf-16-le", errors="surrogatepass") + return [chr(int.from_bytes(encoded[i : i + 2], "little")) for i in range(0, len(encoded), 2)] + + def type_text(text: str, post_key: str | None = None) -> None: """Type *text* into the active window via Win32 SendInput. @@ -90,14 +96,6 @@ def _raise_sendinput_failed(detail: str) -> None: detail = f"{detail} (WinError {err})" raise ScreamerError(AppError.INJECTION_FAILED, detail) - def _send_unicode(char: str, key_up: bool = False) -> None: - inp = INPUT() - inp.type = INPUT_KEYBOARD - inp.ki.wScan = ord(char) - inp.ki.dwFlags = KEYEVENTF_UNICODE | (KEYEVENTF_KEYUP if key_up else 0) - if user32.SendInput(1, ctypes.byref(inp), ctypes.sizeof(INPUT)) != 1: - _raise_sendinput_failed(f"SendInput failed for U+{ord(char):04X}") - def _send_vk(vk: int, key_up: bool = False) -> None: inp = INPUT() inp.type = INPUT_KEYBOARD @@ -106,16 +104,30 @@ def _send_vk(vk: int, key_up: bool = False) -> None: if user32.SendInput(1, ctypes.byref(inp), ctypes.sizeof(INPUT)) != 1: _raise_sendinput_failed(f"SendInput failed for VK 0x{vk:02X}") - def _utf16_units(value: str) -> list[str]: - encoded = value.encode("utf-16-le", errors="surrogatepass") - return [chr(int.from_bytes(encoded[i : i + 2], "little")) for i in range(0, len(encoded), 2)] - try: with log_duration(log, f"Text injection ({len(text)} chars)"): log.info("Typing %d characters", len(text)) - for ch in _utf16_units(text): - _send_unicode(ch) - _send_unicode(ch, key_up=True) + # One batched SendInput call: atomic with respect to concurrent + # user input and far fewer syscalls than per-character sends. + units = _utf16_units(text) + if units: + count = 2 * len(units) + batch = (INPUT * count)() + for i, unit in enumerate(units): + code = ord(unit) + down = batch[2 * i] + down.type = INPUT_KEYBOARD + down.ki.wScan = code + down.ki.dwFlags = KEYEVENTF_UNICODE + up = batch[2 * i + 1] + up.type = INPUT_KEYBOARD + up.ki.wScan = code + up.ki.dwFlags = KEYEVENTF_UNICODE | KEYEVENTF_KEYUP + sent = user32.SendInput(count, batch, ctypes.sizeof(INPUT)) + # Partial injection (sent < count) means a prefix of the text + # was already typed; there is no rollback — surface the counts. + if sent != count: + _raise_sendinput_failed(f"SendInput injected {sent}/{count} events") # Post-type key with 0.05s delay. if post_key and post_key != "none": diff --git a/src/main.py b/src/main.py index b2d1819..0e10132 100644 --- a/src/main.py +++ b/src/main.py @@ -7,6 +7,7 @@ from __future__ import annotations +import copy import logging import threading from typing import Any @@ -29,6 +30,7 @@ POST_KEY_OPTIONS, AppConfig, Hotkey, + has_plaintext_secrets, import_from_env, load_config, save_config, @@ -116,8 +118,12 @@ def __init__(self, startup_mode: bool = False) -> None: super().__init__() self._config = load_config() - self._config = import_from_env(self._config) - save_config(self._config) + imported = import_from_env(copy.deepcopy(self._config)) + if imported != self._config or has_plaintext_secrets(): + # Save when .env added values, or to purge plaintext secrets an + # older version left in settings.ini (save_config removes them). + self._config = imported + save_config(self._config) self._recorder = AudioRecorder() self._bridge = SignalBridge() @@ -325,8 +331,20 @@ def _finalize_recording(self) -> None: self._worker.succeeded.connect(self._on_worker_succeeded) self._worker.failed.connect(self._on_worker_failed) self._worker.cancelled.connect(self._on_worker_cancelled) + # Result slots run first (queued in emission order), then the QThread + # object frees itself — otherwise one worker leaks per dictation. + self._worker.finished.connect(self._worker.deleteLater) self._worker.start() + def _cancel_recording(self) -> None: + """Stop and discard the in-flight recording without processing it.""" + self._recording = False + try: + self._recorder.stop() + except ScreamerError: + pass + self._apply_state(TrayState.IDLE) + # ------------------------------------------------------------------ # Hotkey callbacks (called from hotkey thread via SignalBridge → Qt main) # ------------------------------------------------------------------ @@ -409,8 +427,14 @@ def _on_error(self, code: AppError, detail: str | None = None) -> None: def _toggle_enabled(self, checked: bool) -> None: self._enabled = checked - if not checked and self._recording: - self._finalize_recording() + if not checked: + if self._recording: + # Disabling means "stop"; don't transcribe and type the leftovers. + self._cancel_recording() + elif self._worker is not None: + # Mid-processing: don't type into the focused window after the + # user disabled us. The worker checks this before each step. + self._cancel_event.set() log.info("Screamer %s", "enabled" if checked else "disabled") def _set_recording_mode(self, mode: str, rebuild_menu: bool = True) -> None: diff --git a/src/rewrite.py b/src/rewrite.py index 23307a7..cff3d6e 100644 --- a/src/rewrite.py +++ b/src/rewrite.py @@ -23,7 +23,7 @@ def rewrite(text: str, config: AppConfig) -> PipelineResult: return PipelineResult(text=text) system_prompt = config.llm_system_prompt or "" - language = getattr(config, "stt_language", "") + language = config.stt_language if language: system_prompt += f"\nThe speech language is {language}." @@ -132,7 +132,7 @@ def _call_llm( def _groq_completion_cap(user_text: str) -> int: estimated_input_tokens = max(1, len(user_text) // 4) cap = int(estimated_input_tokens * 1.5) + 32 - return max(128, min(1024, cap)) + return max(128, min(4096, cap)) # --------------------------------------------------------------------------- diff --git a/src/settings_dialog.py b/src/settings_dialog.py index 81413c6..2a35797 100644 --- a/src/settings_dialog.py +++ b/src/settings_dialog.py @@ -10,9 +10,10 @@ import logging from typing import Callable -from PySide6.QtCore import QCoreApplication, QEvent, Qt, Signal -from PySide6.QtGui import QKeyEvent, QMouseEvent +from PySide6.QtCore import QCoreApplication, QEvent, Qt, QThread, Signal +from PySide6.QtGui import QAction, QIcon, QKeyEvent, QMouseEvent, QPainter, QPalette, QPixmap from PySide6.QtWidgets import ( + QApplication, QCheckBox, QComboBox, QDialog, @@ -56,21 +57,56 @@ # Type alias for device list items: (id, display_name). DeviceItem = tuple[int, str] +_CALIBRATE_LABEL = "Recalibrate RMS Threshold" + + +class _CalibrateThread(QThread): + """Runs the blocking RMS calibration off the UI thread.""" + + succeeded = Signal(float) + failed = Signal(str) + + def __init__(self, fn: Callable[[int | None], float], device_id: int | None, parent=None) -> None: + super().__init__(parent) + self._fn = fn + self._device_id = device_id + + def run(self) -> None: + try: + self.succeeded.emit(self._fn(self._device_id)) + except Exception as e: # surface any calibration failure to the dialog + self.failed.emit(str(e)) + + +def _eye_icon() -> QIcon: + """Small glyph for the show/hide toggle (the project ships no icon assets).""" + pixmap = QPixmap(16, 16) + pixmap.fill(Qt.GlobalColor.transparent) + painter = QPainter(pixmap) + # Default pen is black — invisible on dark themes; follow the palette. + painter.setPen(QApplication.palette().color(QPalette.ColorRole.Text)) + painter.drawText(pixmap.rect(), Qt.AlignmentFlag.AlignCenter, "\N{EYE}") + painter.end() + return QIcon(pixmap) + class PasswordField(QLineEdit): - """Password line edit that reveals text only while focused.""" + """Masked line edit with an explicit trailing show/hide toggle. + + Stays masked on focus; the secret is only revealed while the toggle is on. + """ def __init__(self) -> None: super().__init__() self.setEchoMode(QLineEdit.EchoMode.Password) + reveal = QAction(_eye_icon(), "Show", self) + reveal.setCheckable(True) + reveal.setToolTip("Show/hide value") + reveal.toggled.connect(self._on_reveal_toggled) + self.addAction(reveal, QLineEdit.ActionPosition.TrailingPosition) - def focusInEvent(self, event) -> None: - self.setEchoMode(QLineEdit.EchoMode.Normal) - super().focusInEvent(event) - - def focusOutEvent(self, event) -> None: - self.setEchoMode(QLineEdit.EchoMode.Password) - super().focusOutEvent(event) + def _on_reveal_toggled(self, checked: bool) -> None: + self.setEchoMode(QLineEdit.EchoMode.Normal if checked else QLineEdit.EchoMode.Password) class SettingsDialog(QDialog): @@ -98,6 +134,7 @@ def __init__( self._devices = devices if devices is not None else [] self._calibrate_fn = calibrate_fn + self._calib_thread: _CalibrateThread | None = None # Edit a deep copy so the original is untouched until accept. self._working = copy.deepcopy(config) @@ -140,7 +177,7 @@ def _build_ui(self) -> None: | QDialogButtonBox.StandardButton.Cancel | QDialogButtonBox.StandardButton.Apply ) - self._button_box.accepted.connect(self._validate_and_accept) + self._button_box.accepted.connect(self.accept) self._button_box.rejected.connect(self.reject) self._button_box.button(QDialogButtonBox.StandardButton.Apply).clicked.connect( self._on_apply @@ -358,7 +395,7 @@ def _build_audio_tab(self) -> None: self._populate_devices() form.addRow("Input Device:", self._device_combo) - self._calibrate_btn = QPushButton("Recalibrate RMS Threshold") + self._calibrate_btn = QPushButton(_CALIBRATE_LABEL) self._calibrate_btn.clicked.connect(self._on_calibrate) if self._calibrate_fn is None: self._calibrate_btn.setEnabled(False) @@ -509,37 +546,40 @@ def _select_device(self, cfg: AppConfig) -> None: return def _on_calibrate(self) -> None: - """Run RMS auto-calibration via the provided callback.""" - if self._calibrate_fn is None: + """Run RMS auto-calibration in a worker thread; keep the dialog responsive.""" + if self._calibrate_fn is None or self._calib_thread is not None: return - device_id = self._device_combo.currentData() - try: - QMessageBox.information( - self, - "Calibrating", - "Silence please — measuring ambient noise for 2 seconds...", - ) - threshold = self._calibrate_fn(device_id) - self._working.rms_threshold = threshold - self._rms_spin.setValue(threshold) - self._rms_label.setText(f"Threshold: {threshold:.1f}") - except Exception as e: - QMessageBox.warning(self, "Calibration Failed", str(e)) - - # ------------------------------------------------------------------ - # Validation - # ------------------------------------------------------------------ - - def _validate_and_accept(self) -> None: - """Validate required fields, then accept.""" - self._collect() - if not self._show_validation_issue(): - return - if not self._sync_startup_or_warn(): - return - - super().accept() + QMessageBox.information( + self, + "Calibrating", + "Silence please — measuring ambient noise for 2 seconds...", + ) + self._calibrate_btn.setEnabled(False) + self._calibrate_btn.setText("Calibrating...") + + thread = _CalibrateThread(self._calibrate_fn, self._device_combo.currentData(), self) + thread.succeeded.connect(self._on_calibrate_succeeded) + thread.failed.connect(self._on_calibrate_failed) + thread.finished.connect(self._on_calibrate_finished) + self._calib_thread = thread + thread.start() + + def _on_calibrate_succeeded(self, threshold: float) -> None: + self._working.rms_threshold = threshold + self._rms_spin.setValue(threshold) + self._rms_label.setText(f"Threshold: {threshold:.1f}") + + def _on_calibrate_failed(self, message: str) -> None: + QMessageBox.warning(self, "Calibration Failed", message) + + def _on_calibrate_finished(self) -> None: + thread = self._calib_thread + self._calib_thread = None + self._calibrate_btn.setEnabled(True) + self._calibrate_btn.setText(_CALIBRATE_LABEL) + if thread is not None: + thread.deleteLater() # ------------------------------------------------------------------ # Bottom bar actions @@ -578,8 +618,13 @@ def _on_apply(self) -> None: # ------------------------------------------------------------------ def accept(self) -> None: + """Validate on every accept path (OK button, direct accept() calls).""" self._stop_hotkey_recording() self._collect() + if not self._show_validation_issue(): + return + if not self._sync_startup_or_warn(): + return super().accept() def reject(self) -> None: @@ -588,6 +633,12 @@ def reject(self) -> None: def done(self, result: int) -> None: self._stop_hotkey_recording() + if self._calib_thread is not None and self._calib_thread.isRunning(): + # Don't let a running calibration outlive its parent dialog, and + # don't let late results land in slots of a closing dialog. + self._calib_thread.succeeded.disconnect(self._on_calibrate_succeeded) + self._calib_thread.failed.disconnect(self._on_calibrate_failed) + self._calib_thread.wait(5000) super().done(result) def is_hotkey_capture_active(self) -> bool: diff --git a/src/stt.py b/src/stt.py index fe4a9e2..db4548d 100644 --- a/src/stt.py +++ b/src/stt.py @@ -48,6 +48,8 @@ def transcribe(audio_wav: bytes, config: AppConfig) -> PipelineResult: warnings.append(AppError.STT_FALLBACK_USED) return PipelineResult(text=text, warnings=warnings) except ScreamerError: + # NO_SPEECH is final: silence in the primary's result will not + # become speech at the fallback, so don't retry it there. raise except Exception as e: log.warning("%s STT failed: %s", "Fallback" if is_fallback else "Primary", e) diff --git a/tests/test_config.py b/tests/test_config.py index d02bcd5..4cac914 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,10 +1,13 @@ import json import os +import platform +import sys import tempfile import unittest from pathlib import Path +from unittest.mock import patch -from src.config import AppConfig, ProviderConfig, import_from_env, parse_custom_headers, validate_config +from src.config import AppConfig, ProviderConfig, _env_path, import_from_env, parse_custom_headers, validate_config class ConfigValidationTests(unittest.TestCase): @@ -78,5 +81,71 @@ def test_import_from_env_backfills_empty_fields_only(self) -> None: self.assertEqual(imported.stt_model, "env-model") +class SecretHeaderTests(unittest.TestCase): + def test_custom_headers_are_secret_fields(self) -> None: + from src.config import _SECRET_FIELDS + + self.assertTrue( + { + "stt_custom_headers", + "stt_fallback_custom_headers", + "llm_custom_headers", + "llm_fallback_custom_headers", + } + <= _SECRET_FIELDS + ) + + @unittest.skipUnless(platform.system() == "Windows", "DPAPI requires Windows") + def test_save_config_keeps_headers_out_of_ini_and_roundtrips(self) -> None: + from src.config import load_config, save_config + + with tempfile.TemporaryDirectory() as tmp, patch("src.config.APP_DIR", tmp): + cfg = AppConfig(stt_custom_headers='{"X-Token": "s3cret"}') + save_config(cfg) + + ini = Path(tmp, "settings.ini").read_text(encoding="utf-8") + self.assertNotIn("s3cret", ini) + + self.assertEqual(load_config().stt_custom_headers, '{"X-Token": "s3cret"}') + + def test_has_plaintext_secrets_detects_stale_ini_values(self) -> None: + from src.config import _get_qsettings, has_plaintext_secrets + + with tempfile.TemporaryDirectory() as tmp, patch("src.config.APP_DIR", tmp): + self.assertFalse(has_plaintext_secrets()) + + stale = _get_qsettings() + stale.setValue("llm_custom_headers", '{"X-Old": "plain"}') + stale.sync() + del stale + + self.assertTrue(has_plaintext_secrets()) + + @unittest.skipUnless(platform.system() == "Windows", "DPAPI requires Windows") + def test_save_config_purges_stale_plaintext_headers(self) -> None: + from src.config import _get_qsettings, save_config + + with tempfile.TemporaryDirectory() as tmp, patch("src.config.APP_DIR", tmp): + stale = _get_qsettings() + stale.setValue("llm_custom_headers", '{"X-Old": "plain"}') + stale.sync() + del stale + + save_config(AppConfig()) + + ini = Path(tmp, "settings.ini").read_text(encoding="utf-8") + self.assertNotIn("X-Old", ini) + + +class EnvPathTests(unittest.TestCase): + def test_env_path_uses_cwd_when_not_frozen(self) -> None: + self.assertEqual(_env_path(), os.path.join(os.getcwd(), ".env")) + + def test_env_path_uses_exe_dir_when_frozen(self) -> None: + exe = os.path.join("C:" + os.sep, "apps", "Screamer", "Screamer.exe") + with patch.object(sys, "frozen", new=True, create=True), patch.object(sys, "executable", new=exe): + self.assertEqual(_env_path(), os.path.join(os.path.dirname(exe), ".env")) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_injector.py b/tests/test_injector.py new file mode 100644 index 0000000..01ddf98 --- /dev/null +++ b/tests/test_injector.py @@ -0,0 +1,19 @@ +import unittest + +from src.injector import _utf16_units + + +class Utf16UnitsTests(unittest.TestCase): + def test_ascii_maps_one_to_one(self) -> None: + self.assertEqual([ord(u) for u in _utf16_units("hi")], [0x68, 0x69]) + + def test_emoji_splits_into_surrogate_pair(self) -> None: + units = _utf16_units("\U0001F600") + self.assertEqual([ord(u) for u in units], [0xD83D, 0xDE00]) + + def test_empty_text_yields_no_units(self) -> None: + self.assertEqual(_utf16_units(""), []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_settings_dialog.py b/tests/test_settings_dialog.py new file mode 100644 index 0000000..5f54113 --- /dev/null +++ b/tests/test_settings_dialog.py @@ -0,0 +1,116 @@ +import os +import unittest +from unittest.mock import patch + +os.environ.setdefault("QT_QPA_PLATFORM", "offscreen") + +from PySide6.QtCore import QEvent +from PySide6.QtGui import QFocusEvent +from PySide6.QtTest import QSignalSpy +from PySide6.QtWidgets import QApplication, QLineEdit + +from src.config import AppConfig +from src.settings_dialog import PasswordField, SettingsDialog + +_app = QApplication.instance() or QApplication([]) + + +class AcceptValidationTests(unittest.TestCase): + def test_accept_blocks_on_invalid_config(self) -> None: + dlg = SettingsDialog(AppConfig(), devices=[], calibrate_fn=None) + try: + with patch("src.settings_dialog.QMessageBox"), patch( + "src.settings_dialog.is_supported", return_value=False + ): + dlg.accept() + self.assertEqual(dlg.result(), 0) + finally: + dlg.deleteLater() + + def test_accept_passes_with_valid_config(self) -> None: + cfg = AppConfig(stt_api_key="k", stt_base_url="https://example.test/v1", stt_model="m") + dlg = SettingsDialog(cfg, devices=[], calibrate_fn=None) + try: + with patch("src.settings_dialog.QMessageBox"), patch( + "src.settings_dialog.is_supported", return_value=False + ): + dlg.accept() + self.assertEqual(dlg.result(), 1) + finally: + dlg.deleteLater() + + +class CalibrateThreadTests(unittest.TestCase): + def test_calibration_runs_off_ui_thread_and_updates_spin(self) -> None: + import threading + + release = threading.Event() + + def calibrate(device_id): + release.wait(5) + return 7.5 + + dlg = SettingsDialog(AppConfig(), devices=[], calibrate_fn=calibrate) + try: + with patch("src.settings_dialog.QMessageBox"): + dlg._on_calibrate() + self.assertFalse(dlg._calibrate_btn.isEnabled()) + thread = dlg._calib_thread + self.assertIsNotNone(thread) + spy = QSignalSpy(thread.finished) + release.set() + # wait() spins an event loop, delivering the queued result + # slots and _on_calibrate_finished before returning. + self.assertTrue(spy.wait(5000)) + self.assertIsNone(dlg._calib_thread) + self.assertEqual(dlg._rms_spin.value(), 7.5) + self.assertTrue(dlg._calibrate_btn.isEnabled()) + finally: + release.set() + dlg.deleteLater() + + + def test_close_during_calibration_drops_late_result(self) -> None: + import threading + + release = threading.Event() + + def slow_calibrate(device_id): + release.wait(5) + return 9.9 + + dlg = SettingsDialog(AppConfig(), devices=[], calibrate_fn=slow_calibrate) + try: + with patch("src.settings_dialog.QMessageBox"): + dlg._on_calibrate() + before = dlg._rms_spin.value() + # Release the worker shortly after done() starts waiting on it. + threading.Timer(0.05, release.set).start() + dlg.reject() + QApplication.processEvents() + self.assertEqual(dlg._rms_spin.value(), before) + finally: + release.set() + dlg.deleteLater() + + +class PasswordFieldTests(unittest.TestCase): + def test_stays_masked_on_focus(self) -> None: + field = PasswordField() + field.focusInEvent(QFocusEvent(QEvent.Type.FocusIn)) + self.assertEqual(field.echoMode(), QLineEdit.EchoMode.Password) + + def test_trailing_action_toggles_visibility(self) -> None: + field = PasswordField() + action = field.actions()[0] + self.assertTrue(action.isCheckable()) + + action.setChecked(True) + self.assertEqual(field.echoMode(), QLineEdit.EchoMode.Normal) + + action.setChecked(False) + self.assertEqual(field.echoMode(), QLineEdit.EchoMode.Password) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_stt_rewrite.py b/tests/test_stt_rewrite.py index 2a81b96..7e69ff0 100644 --- a/tests/test_stt_rewrite.py +++ b/tests/test_stt_rewrite.py @@ -175,6 +175,25 @@ def fake_post(_url, **kwargs): self.assertGreaterEqual(captured["max_completion_tokens"], 128) self.assertLessEqual(captured["max_completion_tokens"], 1024) + def test_rewrite_groq_cap_scales_for_long_dictation(self) -> None: + cfg = AppConfig( + llm_enabled=True, + llm_api_key="groq", + llm_base_url="https://api.groq.com/openai/v1", + llm_model="llama-3.1-8b-instant", + ) + captured: dict[str, object] = {} + + def fake_post(_url, **kwargs): + captured.update(kwargs["json"]) + return FakeResponse({"choices": [{"message": {"content": "fixed text"}, "finish_reason": "stop"}]}) + + with patch("src.http_client.post", side_effect=fake_post): + rewrite("a" * 10_000, cfg) + + # 10_000 chars -> ~2500 estimated input tokens -> cap int(2500 * 1.5) + 32 = 3782 + self.assertEqual(captured["max_completion_tokens"], 3782) + def test_rewrite_length_finish_keeps_original_text(self) -> None: cfg = AppConfig( llm_enabled=True, diff --git a/tests/test_tray_menu.py b/tests/test_tray_menu.py index bc2c406..1638a0e 100644 --- a/tests/test_tray_menu.py +++ b/tests/test_tray_menu.py @@ -1,6 +1,6 @@ import os import unittest -from unittest.mock import patch +from unittest.mock import Mock, patch os.environ.setdefault("QT_QPA_PLATFORM", "offscreen") @@ -34,6 +34,7 @@ def test_startup_mode_suppresses_incomplete_config_settings_dialog(self): patch.object(_TrayApp, "_build_tray"), patch.object(_TrayApp, "_build_hotkey"), patch.object(_TrayApp, "_apply_state"), + patch("src.main.has_plaintext_secrets", return_value=False), patch.object(_TrayApp, "_open_settings"), ] @@ -48,6 +49,90 @@ def test_startup_mode_suppresses_incomplete_config_settings_dialog(self): for p in reversed(patches): p.stop() + def _startup_patches(self, import_side_effect): + from src.config import AppConfig + + return [ + patch("src.main.load_config", return_value=AppConfig()), + patch("src.main.import_from_env", side_effect=import_side_effect), + patch("src.main.save_config"), + patch("src.main.validate_config", return_value=[]), + patch("src.main.AudioRecorder"), + patch.object(_TrayApp, "_build_tray"), + patch.object(_TrayApp, "_build_hotkey"), + patch.object(_TrayApp, "_apply_state"), + patch.object(_TrayApp, "_open_settings"), + patch("src.main.has_plaintext_secrets", return_value=False), + ] + + def test_startup_save_skipped_when_env_adds_nothing(self): + patches = self._startup_patches(lambda cfg: cfg) + started = [p.start() for p in patches] + try: + _TrayApp(startup_mode=True) + save_config_mock = started[2] + save_config_mock.assert_not_called() + finally: + for p in reversed(patches): + p.stop() + + def test_startup_save_runs_when_env_imports_values(self): + def fake_import(cfg): + cfg.stt_api_key = "imported" + return cfg + + patches = self._startup_patches(fake_import) + started = [p.start() for p in patches] + try: + _TrayApp(startup_mode=True) + save_config_mock = started[2] + save_config_mock.assert_called_once() + finally: + for p in reversed(patches): + p.stop() + + def test_startup_save_runs_when_plaintext_secrets_linger(self): + patches = self._startup_patches(lambda cfg: cfg) + patches[-1] = patch("src.main.has_plaintext_secrets", return_value=True) + started = [p.start() for p in patches] + try: + _TrayApp(startup_mode=True) + save_config_mock = started[2] + save_config_mock.assert_called_once() + finally: + for p in reversed(patches): + p.stop() + + def test_disable_while_recording_discards_audio(self): + from src.icons import TrayState + + tray_app = make_tray_app() + tray_app._recording = True + tray_app._recorder = Mock() + states = [] + tray_app._apply_state = lambda s: states.append(s) + finalized = [] + tray_app._finalize_recording = lambda: finalized.append(True) + + tray_app._toggle_enabled(False) + + self.assertFalse(tray_app._recording) + tray_app._recorder.stop.assert_called_once() + self.assertEqual(finalized, []) + self.assertEqual(states, [TrayState.IDLE]) + + def test_disable_while_processing_cancels_worker(self): + import threading + + tray_app = make_tray_app() + tray_app._recording = False + tray_app._worker = Mock() + tray_app._cancel_event = threading.Event() + + tray_app._toggle_enabled(False) + + self.assertTrue(tray_app._cancel_event.is_set()) + def test_choice_submenu_uses_widget_actions(self): from PySide6.QtWidgets import QWidgetAction