From 78b03aa12b93ba3d5b108a1155d53f54220780ab Mon Sep 17 00:00:00 2001 From: klpoland Date: Fri, 19 Jun 2026 11:01:43 -0400 Subject: [PATCH 1/8] gateway: federation export API, Redis events, sync API key --- gateway/.envs/example/django.env | 11 +++ gateway/config/api_router.py | 2 + gateway/config/settings/base.py | 19 +++- gateway/sds_gateway/api_methods/apps.py | 1 + .../sds_gateway/api_methods/authentication.py | 4 +- .../api_methods/federation/__init__.py | 0 .../api_methods/federation/events.py | 39 ++++++++ .../api_methods/federation/export_contract.py | 26 +++++ .../api_methods/federation/signals.py | 71 ++++++++++++++ .../helpers/compile_federated_data.py | 78 +++++++++++++++ gateway/sds_gateway/api_methods/models.py | 1 + .../sds_gateway/api_methods/permissions.py | 31 ++++++ .../serializers/capture_serializers.py | 53 ++++++++++ .../serializers/dataset_serializers.py | 40 ++++++++ .../api_methods/tests/test_authenticate.py | 2 +- .../tests/test_federation_export.py | 98 +++++++++++++++++++ .../tests/test_federation_export_contract.py | 94 ++++++++++++++++++ .../api_methods/views/capture_endpoints.py | 2 - .../api_methods/views/dataset_endpoints.py | 4 +- .../api_methods/views/federation_endpoints.py | 87 ++++++++++++++++ .../api_methods/views/file_endpoints.py | 3 - .../create_federation_sync_api_key.py | 46 +++++++++ ...alter_userapikey_source_federation_sync.py | 38 +++++++ .../users/migrations/max_migration.txt | 2 +- gateway/sds_gateway/users/models.py | 1 + 25 files changed, 740 insertions(+), 13 deletions(-) create mode 100644 gateway/sds_gateway/api_methods/federation/__init__.py create mode 100644 gateway/sds_gateway/api_methods/federation/events.py create mode 100644 gateway/sds_gateway/api_methods/federation/export_contract.py create mode 100644 gateway/sds_gateway/api_methods/federation/signals.py create mode 100644 gateway/sds_gateway/api_methods/helpers/compile_federated_data.py create mode 100644 gateway/sds_gateway/api_methods/permissions.py create mode 100644 gateway/sds_gateway/api_methods/tests/test_federation_export.py create mode 100644 gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py create mode 100644 gateway/sds_gateway/api_methods/views/federation_endpoints.py create mode 100644 gateway/sds_gateway/users/management/commands/create_federation_sync_api_key.py create mode 100644 gateway/sds_gateway/users/migrations/0012_alter_userapikey_source_federation_sync.py diff --git a/gateway/.envs/example/django.env b/gateway/.envs/example/django.env index 1287a26fa..77e2990f8 100644 --- a/gateway/.envs/example/django.env +++ b/gateway/.envs/example/django.env @@ -8,6 +8,17 @@ HOSTNAME=localhost:8000 API_VERSION=v1 API_KEY= +# FEDERATION +# ------------------------------------------------------------------------------ +# the below environment variables are used for site federation with peers. +# By default, federation events are disabled. +# If FEDERATION_EVENTS_ENABLED is TRUE, deployment will need to include +# federation sync service configuration. +FEDERATION_SITE_NAME= +FEDERATION_EVENTS_ENABLED=false +FEDERATION_EVENTS_CHANNEL=federation:events +FEDERATION_SYNC_USER_EMAIL= + # AUTH0 # ------------------------------------------------------------------------------ # Set these from your Auth0 application settings diff --git a/gateway/config/api_router.py b/gateway/config/api_router.py index cc9a80f44..976ff5023 100644 --- a/gateway/config/api_router.py +++ b/gateway/config/api_router.py @@ -7,6 +7,7 @@ from sds_gateway.api_methods.views.dataset_endpoints import DatasetViewSet from sds_gateway.api_methods.views.file_endpoints import FileViewSet from sds_gateway.api_methods.views.file_endpoints import check_contents_exist +from sds_gateway.api_methods.views.federation_endpoints import FederationViewSet from sds_gateway.users.api.views import UserViewSet from sds_gateway.visualizations.api_views import VisualizationViewSet @@ -17,6 +18,7 @@ router.register(r"assets/files", FileViewSet, basename="files") router.register(r"assets/captures", CaptureViewSet, basename="captures") router.register(r"assets/datasets", DatasetViewSet, basename="datasets") +router.register(r"federation", FederationViewSet, basename="federation") if settings.VISUALIZATIONS_ENABLED: router.register(r"visualizations", VisualizationViewSet, basename="visualizations") diff --git a/gateway/config/settings/base.py b/gateway/config/settings/base.py index 87fb85ab4..8c483a22d 100644 --- a/gateway/config/settings/base.py +++ b/gateway/config/settings/base.py @@ -610,7 +610,10 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: "rest_framework.authentication.SessionAuthentication", "sds_gateway.api_methods.authentication.APIKeyAuthentication", ), - "DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",), + "DEFAULT_PERMISSION_CLASSES": ( + "rest_framework.permissions.IsAuthenticated", + "sds_gateway.api_methods.permissions.DisallowFederationSyncKey", + ), "DEFAULT_SCHEMA_CLASS": "drf_spectacular.openapi.AutoSchema", "DEFAULT_THROTTLE_RATES": { "vis_stream": VIS_STREAM_THROTTLE_RATE, @@ -709,6 +712,20 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: SDS_PROGRAMMATIC_SITE_NAME: str = env.str("SDS_PROGRAMMATIC_SITE_NAME", default="sds") SDS_SITE_FQDN: str = env.str("SDS_SITE_FQDN", default="localhost") +FEDERATION_SITE_NAME: str = env.str( + "FEDERATION_SITE_NAME", + default="", +) +FEDERATION_EVENTS_ENABLED: bool = env.bool("FEDERATION_EVENTS_ENABLED", default=False) +FEDERATION_EVENTS_CHANNEL: str = env.str( + "FEDERATION_EVENTS_CHANNEL", + default="federation:events", +) +FEDERATION_SYNC_USER_EMAIL: str = env.str( + "FEDERATION_SYNC_USER_EMAIL", + default="federation-sync@internal.local", +) + # ADMIN_CONSOLE_ENV is used to visually distinguish between different environments # (production, staging, local) in the admin console and error emails. It does not affect # any functionality and it is meant to prevent changes in production meant for testing diff --git a/gateway/sds_gateway/api_methods/apps.py b/gateway/sds_gateway/api_methods/apps.py index 73073e45a..2c38194ff 100644 --- a/gateway/sds_gateway/api_methods/apps.py +++ b/gateway/sds_gateway/api_methods/apps.py @@ -12,6 +12,7 @@ class ApiMethodsConfig(AppConfig): # pattern to import application modules here in ready() # ruff: noqa: PLC0415 def ready(self) -> None: + import sds_gateway.api_methods.federation.signals # noqa: F401 import sds_gateway.api_methods.schema # noqa: F401 silence_unwanted_logs() diff --git a/gateway/sds_gateway/api_methods/authentication.py b/gateway/sds_gateway/api_methods/authentication.py index 2a210478f..60e831970 100644 --- a/gateway/sds_gateway/api_methods/authentication.py +++ b/gateway/sds_gateway/api_methods/authentication.py @@ -13,7 +13,7 @@ class APIKeyAuthentication(BaseAuthentication): keyword = "Api-Key" - def authenticate(self, request) -> tuple[User, bool]: + def authenticate(self, request) -> tuple[User, UserAPIKey]: """Authenticates the user with their API key. Args: request: Contains the API key in the Authorization header. @@ -47,7 +47,7 @@ def authenticate(self, request) -> tuple[User, bool]: raise AuthenticationFailed(msg) from err user = api_key_obj.user - return (user, True) + return (user, api_key_obj) def authenticate_header(self, request) -> str: return self.keyword diff --git a/gateway/sds_gateway/api_methods/federation/__init__.py b/gateway/sds_gateway/api_methods/federation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/gateway/sds_gateway/api_methods/federation/events.py b/gateway/sds_gateway/api_methods/federation/events.py new file mode 100644 index 000000000..f1d8ff897 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/events.py @@ -0,0 +1,39 @@ +"""Publish federation change notifications to Redis.""" + +from __future__ import annotations + +import json +from datetime import UTC +from datetime import datetime +from typing import Any +from uuid import UUID + +from django.conf import settings +from loguru import logger as log + +from sds_gateway.api_methods.models import ItemType +from sds_gateway.api_methods.tasks import get_redis_client + +FederationEventType = str # created | updated | deleted + + +def publish_federation_event( + *, + event_type: FederationEventType, + item_type: ItemType, + uuid: UUID, + timestamp: datetime | None = None, +) -> None: + """Notify the local federation sync service via Redis pub/sub.""" + channel = getattr(settings, "FEDERATION_EVENTS_CHANNEL", "federation:events") + payload: dict[str, Any] = { + "event_type": event_type, + "item_type": item_type.value, + "uuid": str(uuid), + "timestamp": (timestamp or datetime.now(UTC)).isoformat(), + } + try: + client = get_redis_client() + client.publish(channel, json.dumps(payload)) + except Exception as err: # noqa: BLE001 + log.warning("Failed to publish federation event: {}", err) diff --git a/gateway/sds_gateway/api_methods/federation/export_contract.py b/gateway/sds_gateway/api_methods/federation/export_contract.py new file mode 100644 index 000000000..7d9931f67 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/export_contract.py @@ -0,0 +1,26 @@ +"""Helpers to keep gateway export serializers aligned with sync Pydantic models.""" + +from __future__ import annotations + +from typing import Any + +from rest_framework.serializers import BaseSerializer + + +def serializer_output_field_names(serializer: BaseSerializer[Any]) -> set[str]: + return set(serializer.fields.keys()) + + +def assert_field_names_match( + serializer: BaseSerializer[Any], + pydantic_model: type, + *, + label: str, +) -> None: + expected = set(pydantic_model.model_fields.keys()) + actual = serializer_output_field_names(serializer) + if expected != actual: + missing = expected - actual + extra = actual - expected + msg = f"{label} field mismatch: missing={sorted(missing)!r} extra={sorted(extra)!r}" + raise AssertionError(msg) diff --git a/gateway/sds_gateway/api_methods/federation/signals.py b/gateway/sds_gateway/api_methods/federation/signals.py new file mode 100644 index 000000000..c65619971 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/signals.py @@ -0,0 +1,71 @@ +"""Django signals that publish federation Redis events.""" + +from __future__ import annotations + +from django.conf import settings +from django.db.models.signals import post_save +from django.dispatch import receiver +from loguru import logger as log + +from sds_gateway.api_methods.federation.events import publish_federation_event +from sds_gateway.api_methods.helpers.compile_federated_data import ( + is_federation_exportable_capture, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + is_federation_exportable_dataset, +) +from sds_gateway.api_methods.models import Capture +from sds_gateway.api_methods.models import Dataset +from sds_gateway.api_methods.models import ItemType + + +def _event_type(*, created: bool, exportable: bool) -> str: + if not exportable: + return "deleted" + return "created" if created else "updated" + + +@receiver(post_save, sender=Dataset) +def federation_dataset_changed( + sender: type[Dataset], + instance: Dataset, + created: bool, + **kwargs, +) -> None: + if not getattr(settings, "FEDERATION_EVENTS_ENABLED", True): + log.debug( + "FEDERATION_EVENTS_ENABLED is False, " + "skipping federation dataset changed signal", + ) + return + + exportable = is_federation_exportable_dataset(instance) + publish_federation_event( + event_type=_event_type(created=created, exportable=exportable), + item_type=ItemType.DATASET, + uuid=instance.uuid, + timestamp=instance.updated_at, + ) + + +@receiver(post_save, sender=Capture) +def federation_capture_changed( + sender: type[Capture], + instance: Capture, + created: bool, + **kwargs, +) -> None: + if not getattr(settings, "FEDERATION_EVENTS_ENABLED", True): + log.debug( + "FEDERATION_EVENTS_ENABLED is False, " + "skipping federation capture changed signal", + ) + return + + exportable = is_federation_exportable_capture(instance) + publish_federation_event( + event_type=_event_type(created=created, exportable=exportable), + item_type=ItemType.CAPTURE, + uuid=instance.uuid, + timestamp=instance.updated_at, + ) diff --git a/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py b/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py new file mode 100644 index 000000000..48db49377 --- /dev/null +++ b/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py @@ -0,0 +1,78 @@ +"""Build federation export payloads (RFC fed-* index shape).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from typing import Any + +from django.conf import settings + +from sds_gateway.api_methods.models import Capture +from sds_gateway.api_methods.models import Dataset +from sds_gateway.api_methods.models import DatasetStatus +from sds_gateway.api_methods.serializers.dataset_serializers import ( + get_dataset_serializer, +) +from sds_gateway.api_methods.serializers.dataset_serializers import ( + DatasetFederationSerializer, +) +from sds_gateway.api_methods.serializers.capture_serializers import ( + CaptureFederationSerializer, +) + +if TYPE_CHECKING: + from django.db.models import QuerySet + + +def federation_site_name() -> str: + return getattr(settings, "FEDERATION_SITE_NAME", settings.SDS_PROGRAMMATIC_SITE_NAME) + + +def public_datasets_queryset() -> QuerySet[Dataset]: + return ( + Dataset.objects.filter( + status=DatasetStatus.FINAL, + is_public=True, + is_deleted=False, + ) + .prefetch_related("keywords", "owner") + .order_by("-updated_at") + ) + + +def public_captures_queryset() -> QuerySet[Capture]: + return Capture.objects.filter(is_public=True, is_deleted=False).order_by( + "-updated_at", + ) + + +def is_federation_exportable_dataset(dataset: Dataset) -> bool: + return ( + not dataset.is_deleted + and dataset.is_public + and dataset.status == DatasetStatus.FINAL + ) + + +def is_federation_exportable_capture(capture: Capture) -> bool: + return not capture.is_deleted and capture.is_public + + +def compile_federated_dataset_doc(dataset: Dataset) -> dict[str, Any]: + """Serialize a public dataset for federation sync / OpenSearch.""" + site_name = federation_site_name() + federation_data = DatasetFederationSerializer( + dataset, + context={"site_name": site_name}, + ) + return federation_data.data + + +def compile_federated_capture_doc(capture: Capture) -> dict[str, Any]: + """Serialize a public capture for federation sync / OpenSearch.""" + site_name = federation_site_name() + federation_data = CaptureFederationSerializer( + capture, + context={"site_name": site_name}, + ) + return federation_data.data diff --git a/gateway/sds_gateway/api_methods/models.py b/gateway/sds_gateway/api_methods/models.py index 8c56b3c87..322fef020 100644 --- a/gateway/sds_gateway/api_methods/models.py +++ b/gateway/sds_gateway/api_methods/models.py @@ -88,6 +88,7 @@ class KeySources(StrEnum): SDSWebUI = "sds_web_ui" SVIBackend = "svi_backend" SVIWebUI = "svi_web_ui" + FederationSync = "federation_sync" class ItemType(StrEnum): diff --git a/gateway/sds_gateway/api_methods/permissions.py b/gateway/sds_gateway/api_methods/permissions.py new file mode 100644 index 000000000..2af3b8929 --- /dev/null +++ b/gateway/sds_gateway/api_methods/permissions.py @@ -0,0 +1,31 @@ +"""DRF permissions for API key scoping.""" + +from rest_framework.permissions import BasePermission + +from sds_gateway.api_methods.models import KeySources +from sds_gateway.users.models import UserAPIKey + + +class IsFederationSyncKey(BasePermission): + """Allow only federation sync service API keys.""" + + message = "Federation sync API key required." + + def has_permission(self, request, view) -> bool: + key = request.auth + return isinstance(key, UserAPIKey) and key.source == KeySources.FederationSync + + +class DisallowFederationSyncKey(BasePermission): + """Block federation sync keys from non-federation API routes. + + Applied globally via REST_FRAMEWORK DEFAULT_PERMISSION_CLASSES. + """ + + message = "This API key is restricted to federation export endpoints." + + def has_permission(self, request, view) -> bool: + key = request.auth + if isinstance(key, UserAPIKey) and key.source == KeySources.FederationSync: + return False + return True diff --git a/gateway/sds_gateway/api_methods/serializers/capture_serializers.py b/gateway/sds_gateway/api_methods/serializers/capture_serializers.py index fd5c4cb4d..56156b777 100644 --- a/gateway/sds_gateway/api_methods/serializers/capture_serializers.py +++ b/gateway/sds_gateway/api_methods/serializers/capture_serializers.py @@ -12,6 +12,7 @@ from rest_framework.utils.serializer_helpers import ReturnList from sds_gateway.api_methods.helpers.index_handling import retrieve_indexed_metadata +from sds_gateway.api_methods.utils.relationship_utils import get_capture_datasets from sds_gateway.api_methods.models import Capture from sds_gateway.api_methods.models import CaptureType from sds_gateway.api_methods.models import DEPRECATEDPostProcessedData @@ -883,3 +884,55 @@ def serialize_capture_or_composite( # Serialize as single capture serializer = CaptureGetSerializer(capture_data["capture"], context=context) return serializer.data + + +class CaptureFederationSerializer(serializers.ModelSerializer[Capture]): + """Public-safe capture payload for federation export (sync / OpenSearch).""" + + site_name = serializers.SerializerMethodField() + file_count = serializers.SerializerMethodField() + size = serializers.SerializerMethodField() + capture_props = serializers.SerializerMethodField() + dataset_ids = serializers.SerializerMethodField() + created_at = serializers.DateTimeField( + format="%Y-%m-%d %H:%M:%S%z", + read_only=True, + ) + updated_at = serializers.DateTimeField( + format="%Y-%m-%d %H:%M:%S%z", + read_only=True, + ) + + class Meta: + model = Capture + fields = [ + "uuid", + "name", + "capture_type", + "channel", + "scan_group", + "top_level_dir", + "created_at", + "updated_at", + "site_name", + "file_count", + "size", + "capture_props", + "dataset_ids", + ] + + def get_site_name(self, obj: Capture) -> str: + return str((self.context or {})["site_name"]) + + def get_file_count(self, obj: Capture) -> int: + return int(obj.get_files_summary()["total_count"]) + + def get_size(self, obj: Capture) -> int: + return int(obj.get_files_summary()["total_size"]) + + def get_capture_props(self, obj: Capture) -> dict[str, Any]: + return obj.get_opensearch_metadata() or {} + + def get_dataset_ids(self, obj: Capture) -> list[str]: + qs = get_capture_datasets(obj, include_deleted=False) + return [str(dataset.uuid) for dataset in qs] \ No newline at end of file diff --git a/gateway/sds_gateway/api_methods/serializers/dataset_serializers.py b/gateway/sds_gateway/api_methods/serializers/dataset_serializers.py index 9f57ceffa..519a4bf79 100644 --- a/gateway/sds_gateway/api_methods/serializers/dataset_serializers.py +++ b/gateway/sds_gateway/api_methods/serializers/dataset_serializers.py @@ -334,6 +334,46 @@ class Meta: ] +class DatasetFederationSerializer(DatasetPublicSerializer): + """Serializer for dataset data for federation export.""" + + site_name = serializers.SerializerMethodField() + updated_at = serializers.DateTimeField( + format=READABLE_ISO_DATE_TIME, + read_only=True, + ) + size = serializers.SerializerMethodField() + capture_count = serializers.SerializerMethodField() + capture_file_count = serializers.SerializerMethodField() + artifact_file_count = serializers.SerializerMethodField() + + class Meta(DatasetPublicSerializer.Meta): + fields = [ + *DatasetPublicSerializer.Meta.fields, + "updated_at", + "site_name", + "size", + "capture_count", + "capture_file_count", + "artifact_file_count", + ] + + def get_site_name(self, obj: Dataset) -> str: + return str((self.context or {})["site_name"]) + + def get_size(self, obj: Dataset) -> int: + return int(obj.get_dataset_file_statistics()["total_size"]) + + def get_capture_count(self, obj: Dataset) -> int: + return obj.captures.filter(is_deleted=False).count() + + def get_capture_file_count(self, obj: Dataset) -> int: + return int(obj.get_dataset_file_statistics()["captures"]) + + def get_artifact_file_count(self, obj: Dataset) -> int: + return int(obj.get_dataset_file_statistics()["artifacts"]) + + def get_dataset_serializer( dataset: Dataset, *, diff --git a/gateway/sds_gateway/api_methods/tests/test_authenticate.py b/gateway/sds_gateway/api_methods/tests/test_authenticate.py index f070fc251..7c9620213 100644 --- a/gateway/sds_gateway/api_methods/tests/test_authenticate.py +++ b/gateway/sds_gateway/api_methods/tests/test_authenticate.py @@ -34,7 +34,7 @@ def test_auth_valid_key(self) -> None: user, auth = self.auth.authenticate(request) # Verify that the user is authenticated - assert auth is True + assert isinstance(auth, UserAPIKey) assert user is not None assert user.email == "test@example.com" diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_export.py b/gateway/sds_gateway/api_methods/tests/test_federation_export.py new file mode 100644 index 000000000..1c89736a8 --- /dev/null +++ b/gateway/sds_gateway/api_methods/tests/test_federation_export.py @@ -0,0 +1,98 @@ +"""Tests for federation export endpoints and API key scoping.""" + +from django.contrib.auth import get_user_model +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase + +from sds_gateway.api_methods.models import DatasetStatus +from sds_gateway.api_methods.models import KeySources +from sds_gateway.api_methods.tests.factories import CaptureFactory +from sds_gateway.api_methods.tests.factories import DatasetFactory +from sds_gateway.users.models import UserAPIKey + +User = get_user_model() + + +class FederationExportAPITest(APITestCase): + def setUp(self) -> None: + self.owner = User.objects.create(email="owner@example.com", is_approved=True) + self.sync_user = User.objects.create( + email="sync@internal.local", + is_approved=True, + ) + _obj, self.sync_key = UserAPIKey.objects.create_key( + name="sync", + user=self.sync_user, + source=KeySources.FederationSync, + ) + _obj, self.user_key = UserAPIKey.objects.create_key( + name="regular", + user=self.owner, + source=KeySources.SDSWebUI, + ) + self.public_dataset = DatasetFactory( + owner=self.owner, + is_public=True, + status=DatasetStatus.FINAL, + keywords=None, + ) + self.private_dataset = DatasetFactory( + owner=self.owner, + is_public=False, + status=DatasetStatus.DRAFT, + keywords=None, + ) + self.public_capture = CaptureFactory(owner=self.owner, is_public=True) + self.list_datasets_url = reverse( + "api:federation-export-datasets-list", + ) + self.detail_dataset_url = reverse( + "api:federation-export-dataset-detail", + kwargs={"pk": str(self.public_dataset.uuid)}, + ) + self.list_captures_url = reverse( + "api:federation-export-captures-list", + ) + + def _auth(self, key: str) -> dict[str, str]: + return {"HTTP_AUTHORIZATION": f"Api-Key: {key}"} + + def test_sync_key_can_list_public_datasets(self) -> None: + response = self.client.get(self.list_datasets_url, **self._auth(self.sync_key)) + assert response.status_code == status.HTTP_200_OK + uuids = {row["uuid"] for row in response.json()} + assert str(self.public_dataset.uuid) in uuids + assert str(self.private_dataset.uuid) not in uuids + + def test_sync_key_can_retrieve_public_dataset(self) -> None: + response = self.client.get( + self.detail_dataset_url, + **self._auth(self.sync_key), + ) + assert response.status_code == status.HTTP_200_OK + assert response.json()["uuid"] == str(self.public_dataset.uuid) + assert response.json()["site_name"] + + def test_regular_key_denied_on_export(self) -> None: + response = self.client.get(self.list_datasets_url, **self._auth(self.user_key)) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_sync_key_denied_on_dataset_assets_api(self) -> None: + url = reverse( + "api:datasets-detail", + kwargs={"pk": str(self.public_dataset.uuid)}, + ) + response = self.client.get(url, **self._auth(self.sync_key)) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_sync_key_denied_on_capture_list(self) -> None: + url = reverse("api:captures-list") + response = self.client.get(url, **self._auth(self.sync_key)) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_export_captures_list(self) -> None: + response = self.client.get(self.list_captures_url, **self._auth(self.sync_key)) + assert response.status_code == status.HTTP_200_OK + uuids = {row["uuid"] for row in response.json()} + assert str(self.public_capture.uuid) in uuids diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py b/gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py new file mode 100644 index 000000000..0c86277d1 --- /dev/null +++ b/gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py @@ -0,0 +1,94 @@ +"""Contract tests: gateway federation export JSON โ†” sync Pydantic models.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest +from django.contrib.auth import get_user_model + +from sds_gateway.api_methods.federation.export_contract import assert_field_names_match +from sds_gateway.api_methods.helpers.compile_federated_data import ( + compile_federated_capture_doc, + compile_federated_dataset_doc, +) +from sds_gateway.api_methods.models import DatasetStatus +from sds_gateway.api_methods.serializers.capture_serializers import ( + CaptureFederationSerializer, +) +from sds_gateway.api_methods.serializers.dataset_serializers import ( + DatasetFederationSerializer, +) +from sds_gateway.api_methods.tests.factories import CaptureFactory +from sds_gateway.api_methods.tests.factories import DatasetFactory + +_repo_root = Path(__file__).resolve().parents[4] +_federation_root = _repo_root / "federation" +if _federation_root.is_dir(): + sys.path.insert(0, str(_federation_root)) + +pytest.importorskip("sds_federation") + +from sds_federation.schemas.webhooks import ( # noqa: E402 + FederatedCaptureDoc, + FederatedDatasetDoc, +) + +User = get_user_model() + + +@pytest.mark.django_db +def test_dataset_export_field_names_match_pydantic() -> None: + owner = User.objects.create(email="owner@example.com", is_approved=True) + dataset = DatasetFactory( + owner=owner, + is_public=True, + status=DatasetStatus.FINAL, + keywords=None, + ) + serializer = DatasetFederationSerializer( + dataset, + context={"site_name": "crc"}, + ) + assert_field_names_match( + serializer, + FederatedDatasetDoc, + label="DatasetFederationSerializer", + ) + + +@pytest.mark.django_db +def test_capture_export_field_names_match_pydantic() -> None: + owner = User.objects.create(email="cap-owner@example.com", is_approved=True) + capture = CaptureFactory(owner=owner, is_public=True) + serializer = CaptureFederationSerializer( + capture, + context={"site_name": "crc"}, + ) + assert_field_names_match( + serializer, + FederatedCaptureDoc, + label="CaptureFederationSerializer", + ) + + +@pytest.mark.django_db +def test_compile_federated_dataset_doc_validates_against_pydantic() -> None: + owner = User.objects.create(email="d@example.com", is_approved=True) + dataset = DatasetFactory( + owner=owner, + is_public=True, + status=DatasetStatus.FINAL, + keywords=None, + ) + payload = compile_federated_dataset_doc(dataset) + FederatedDatasetDoc.model_validate(payload) + + +@pytest.mark.django_db +def test_compile_federated_capture_doc_validates_against_pydantic() -> None: + owner = User.objects.create(email="c@example.com", is_approved=True) + capture = CaptureFactory(owner=owner, is_public=True) + payload = compile_federated_capture_doc(capture) + FederatedCaptureDoc.model_validate(payload) diff --git a/gateway/sds_gateway/api_methods/views/capture_endpoints.py b/gateway/sds_gateway/api_methods/views/capture_endpoints.py index 94f23808e..9e2ef28a6 100644 --- a/gateway/sds_gateway/api_methods/views/capture_endpoints.py +++ b/gateway/sds_gateway/api_methods/views/capture_endpoints.py @@ -27,7 +27,6 @@ from rest_framework.authentication import SessionAuthentication from rest_framework.decorators import action from rest_framework.pagination import PageNumberPagination -from rest_framework.permissions import IsAuthenticated from rest_framework.request import Request from rest_framework.response import Response @@ -208,7 +207,6 @@ class CapturePagination(PageNumberPagination): class CaptureViewSet(viewsets.ViewSet): authentication_classes = [SessionAuthentication, APIKeyAuthentication] - permission_classes = [IsAuthenticated] def _validate_and_index_metadata( self, diff --git a/gateway/sds_gateway/api_methods/views/dataset_endpoints.py b/gateway/sds_gateway/api_methods/views/dataset_endpoints.py index 0785ece85..6125fa0f0 100644 --- a/gateway/sds_gateway/api_methods/views/dataset_endpoints.py +++ b/gateway/sds_gateway/api_methods/views/dataset_endpoints.py @@ -11,7 +11,6 @@ from rest_framework import status from rest_framework.authentication import SessionAuthentication from rest_framework.decorators import action -from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.viewsets import ViewSet @@ -56,8 +55,7 @@ def _truthy_query_param(raw: str | None) -> bool: class DatasetViewSet(ViewSet): - authentication_classes = [SessionAuthentication, APIKeyAuthentication] - permission_classes = [IsAuthenticated] + authentication_classes = [APIKeyAuthentication] def _get_file_objects( self, diff --git a/gateway/sds_gateway/api_methods/views/federation_endpoints.py b/gateway/sds_gateway/api_methods/views/federation_endpoints.py new file mode 100644 index 000000000..2f9c06016 --- /dev/null +++ b/gateway/sds_gateway/api_methods/views/federation_endpoints.py @@ -0,0 +1,87 @@ +"""Federation export API (sync service โ†’ gateway, Postgres public metadata).""" + +from __future__ import annotations + +from django.shortcuts import get_object_or_404 +from drf_spectacular.utils import extend_schema +from rest_framework.decorators import action +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.viewsets import ViewSet + +from sds_gateway.api_methods.authentication import APIKeyAuthentication +from sds_gateway.api_methods.helpers.compile_federated_data import ( + compile_federated_capture_doc, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + compile_federated_dataset_doc, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + is_federation_exportable_capture, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + is_federation_exportable_dataset, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + public_captures_queryset, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( + public_datasets_queryset, +) +from sds_gateway.api_methods.models import Capture +from sds_gateway.api_methods.models import Dataset +from sds_gateway.api_methods.permissions import IsFederationSyncKey + + +@extend_schema(exclude=True) +class FederationViewSet(ViewSet): + """Internal export endpoints for the federation sync service.""" + + authentication_classes = [APIKeyAuthentication] + permission_classes = [IsFederationSyncKey] + + @action(detail=False, methods=["get"], url_path="export/datasets") + def export_datasets_list(self, request: Request) -> Response: + """List all public finalized datasets for federation bootstrap.""" + datasets = public_datasets_queryset() + return Response( + [compile_federated_dataset_doc(dataset) for dataset in datasets], + ) + + @action( + detail=False, + methods=["get"], + url_path=r"export/datasets/(?P[^/.]+)", + ) + def export_dataset_detail(self, request: Request, pk: str | None = None) -> Response: + """Return one public dataset for sync after a local Redis event.""" + dataset = get_object_or_404(Dataset, pk=pk, is_deleted=False) + if not is_federation_exportable_dataset(dataset): + return Response( + {"detail": "Dataset is not available for federation export."}, + status=404, + ) + return Response(compile_federated_dataset_doc(dataset)) + + @action(detail=False, methods=["get"], url_path="export/captures") + def export_captures_list(self, request: Request) -> Response: + """List all public captures for federation bootstrap.""" + captures = public_captures_queryset() + return Response( + [compile_federated_capture_doc(capture) for capture in captures], + ) + + @action( + detail=False, + methods=["get"], + url_path=r"export/captures/(?P[^/.]+)", + ) + def export_capture_detail(self, request: Request, pk: str | None = None) -> Response: + """Return one public capture for sync after a local Redis event.""" + capture = get_object_or_404(Capture, pk=pk, is_deleted=False) + if not is_federation_exportable_capture(capture): + return Response( + {"detail": "Capture is not available for federation export."}, + status=404, + ) + return Response(compile_federated_capture_doc(capture)) diff --git a/gateway/sds_gateway/api_methods/views/file_endpoints.py b/gateway/sds_gateway/api_methods/views/file_endpoints.py index 0a441566b..1e9308df8 100644 --- a/gateway/sds_gateway/api_methods/views/file_endpoints.py +++ b/gateway/sds_gateway/api_methods/views/file_endpoints.py @@ -24,7 +24,6 @@ from rest_framework import status from rest_framework.decorators import action from rest_framework.pagination import PageNumberPagination -from rest_framework.permissions import IsAuthenticated from rest_framework.request import Request from rest_framework.response import Response from rest_framework.views import APIView @@ -65,7 +64,6 @@ class FilePagination(PageNumberPagination): class FileViewSet(ViewSet): authentication_classes = [APIKeyAuthentication] - permission_classes = [IsAuthenticated] @staticmethod def _paginated_list_response( @@ -698,7 +696,6 @@ def download_file(self, request: Request, pk: str | None = None) -> HttpResponse class CheckFileContentsExistView(APIView): authentication_classes = [APIKeyAuthentication] - permission_classes = [IsAuthenticated] @extend_schema( request=FilePostSerializer, diff --git a/gateway/sds_gateway/users/management/commands/create_federation_sync_api_key.py b/gateway/sds_gateway/users/management/commands/create_federation_sync_api_key.py new file mode 100644 index 000000000..2c7ede4e2 --- /dev/null +++ b/gateway/sds_gateway/users/management/commands/create_federation_sync_api_key.py @@ -0,0 +1,46 @@ +"""Create the federation sync service user and API key.""" + +from django.conf import settings +from django.contrib.auth import get_user_model +from django.core.management.base import BaseCommand +from loguru import logger as log + +from sds_gateway.api_methods.models import KeySources +from sds_gateway.users.models import UserAPIKey + + +class Command(BaseCommand): + """Provision federation-sync@internal user and a FederationSync API key.""" + + help = "Create federation sync service user and API key (prints raw key once)." + + def handle(self, *args, **options) -> None: + user_model = get_user_model() + email = settings.FEDERATION_SYNC_USER_EMAIL + + user, created = user_model.objects.get_or_create( + email=email, + defaults={ + "is_active": True, + "is_approved": True, + }, + ) + if created: + user.set_unusable_password() + user.save(update_fields=["password"]) + log.info("Created federation sync user {}", email) + else: + log.info("Using existing federation sync user {}", email) + + UserAPIKey.objects.filter( + user=user, + source=KeySources.FederationSync, + ).delete() + + _obj, raw_key = UserAPIKey.objects.create_key( + name="federation-sync", + user=user, + source=KeySources.FederationSync, + description="Federation sync service (export endpoints only)", + ) + self.stdout.write(self.style.SUCCESS(f"Federation sync API key: {raw_key}")) diff --git a/gateway/sds_gateway/users/migrations/0012_alter_userapikey_source_federation_sync.py b/gateway/sds_gateway/users/migrations/0012_alter_userapikey_source_federation_sync.py new file mode 100644 index 000000000..084b10c0b --- /dev/null +++ b/gateway/sds_gateway/users/migrations/0012_alter_userapikey_source_federation_sync.py @@ -0,0 +1,38 @@ +# Generated manually for FederationSync API key source + +from django.db import migrations, models + +import sds_gateway.api_methods.models + + +class Migration(migrations.Migration): + + dependencies = [ + ("users", "0011_user_orcid_id_alter_user_is_approved"), + ] + + operations = [ + migrations.AlterField( + model_name="userapikey", + name="source", + field=models.CharField( + choices=[ + (sds_gateway.api_methods.models.KeySources["SDSWebUI"], "SDS Web UI"), + ( + sds_gateway.api_methods.models.KeySources["SVIBackend"], + "SVI Backend", + ), + ( + sds_gateway.api_methods.models.KeySources["SVIWebUI"], + "SVI Web UI", + ), + ( + sds_gateway.api_methods.models.KeySources["FederationSync"], + "Federation Sync", + ), + ], + default=sds_gateway.api_methods.models.KeySources["SDSWebUI"], + max_length=255, + ), + ), + ] diff --git a/gateway/sds_gateway/users/migrations/max_migration.txt b/gateway/sds_gateway/users/migrations/max_migration.txt index fda121cc0..7707335c4 100644 --- a/gateway/sds_gateway/users/migrations/max_migration.txt +++ b/gateway/sds_gateway/users/migrations/max_migration.txt @@ -1 +1 @@ -0011_user_orcid_id_alter_user_is_approved +0012_alter_userapikey_source_federation_sync \ No newline at end of file diff --git a/gateway/sds_gateway/users/models.py b/gateway/sds_gateway/users/models.py index b1a31d0db..f5b23370f 100644 --- a/gateway/sds_gateway/users/models.py +++ b/gateway/sds_gateway/users/models.py @@ -64,6 +64,7 @@ class UserAPIKey(AbstractAPIKey): (KeySources.SDSWebUI, "SDS Web UI"), (KeySources.SVIBackend, "SVI Backend"), (KeySources.SVIWebUI, "SVI Web UI"), + (KeySources.FederationSync, "Federation Sync"), ] user = cast( "User", From 7b6f1e0d9d2cfffc73ceadce54669f9b85e9e36e Mon Sep 17 00:00:00 2001 From: klpoland Date: Fri, 19 Jun 2026 11:39:01 -0400 Subject: [PATCH 2/8] federation config hardening --- gateway/.envs/example/django.env | 14 +- gateway/config/settings/base.py | 49 +++++ gateway/sds_gateway/api_methods/apps.py | 5 + .../api_methods/federation/availability.py | 175 +++++++++++++++ .../api_methods/federation/events.py | 4 + .../api_methods/federation/permissions.py | 44 ++++ .../api_methods/federation/signals.py | 26 ++- .../tests/test_federation_export.py | 26 ++- .../tests/test_federation_hardening.py | 205 ++++++++++++++++++ .../api_methods/views/federation_endpoints.py | 8 +- 10 files changed, 537 insertions(+), 19 deletions(-) create mode 100644 gateway/sds_gateway/api_methods/federation/availability.py create mode 100644 gateway/sds_gateway/api_methods/federation/permissions.py create mode 100644 gateway/sds_gateway/api_methods/tests/test_federation_hardening.py diff --git a/gateway/.envs/example/django.env b/gateway/.envs/example/django.env index 77e2990f8..356c45345 100644 --- a/gateway/.envs/example/django.env +++ b/gateway/.envs/example/django.env @@ -12,12 +12,16 @@ API_KEY= # ------------------------------------------------------------------------------ # the below environment variables are used for site federation with peers. # By default, federation events are disabled. -# If FEDERATION_EVENTS_ENABLED is TRUE, deployment will need to include +# If FEDERATION_ENABLED is TRUE, deployment will need to include # federation sync service configuration. -FEDERATION_SITE_NAME= -FEDERATION_EVENTS_ENABLED=false -FEDERATION_EVENTS_CHANNEL=federation:events -FEDERATION_SYNC_USER_EMAIL= +# FEDERATION_ENABLED=true +# FEDERATION_EVENTS_ENABLED=true +# FEDERATION_EVENTS_CHANNEL=federation:events +# FEDERATION_SYNC_USER_EMAIL= +# FEDERATION_SITE_NAME= +# FEDERATION_SYNC_HEALTH_URL=http://federation-sync:8000/sync/health +# FEDERATION_EXPORT_INTERNAL_HEADER_SECRET= +# FEDERATION_EXPORT_TRUST_X_FORWARDED_FOR=false # AUTH0 # ------------------------------------------------------------------------------ diff --git a/gateway/config/settings/base.py b/gateway/config/settings/base.py index 8c483a22d..20ceef92b 100644 --- a/gateway/config/settings/base.py +++ b/gateway/config/settings/base.py @@ -716,6 +716,8 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: "FEDERATION_SITE_NAME", default="", ) +# Master switch: when False, federation export and Redis events are inactive. +FEDERATION_ENABLED: bool = env.bool("FEDERATION_ENABLED", default=False) FEDERATION_EVENTS_ENABLED: bool = env.bool("FEDERATION_EVENTS_ENABLED", default=False) FEDERATION_EVENTS_CHANNEL: str = env.str( "FEDERATION_EVENTS_CHANNEL", @@ -725,6 +727,53 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: "FEDERATION_SYNC_USER_EMAIL", default="federation-sync@internal.local", ) +# Sync service health (e.g. http://federation-sync:8000/sync/health). +FEDERATION_SYNC_HEALTH_URL: str = env.str("FEDERATION_SYNC_HEALTH_URL", default="") +FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT: float = env.float( + "FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT", + default=2.0, +) +FEDERATION_SKIP_SYNC_HEALTH_PROBE: bool = env.bool( + "FEDERATION_SKIP_SYNC_HEALTH_PROBE", + default=False, +) +FEDERATION_SKIP_SYNC_API_KEY_CHECK: bool = env.bool( + "FEDERATION_SKIP_SYNC_API_KEY_CHECK", + default=False, +) +FEDERATION_SKIP_REDIS_PROBE: bool = env.bool( + "FEDERATION_SKIP_REDIS_PROBE", + default=False, +) +# Set at startup / periodic recheck by federation.availability. +FEDERATION_OPERATIONAL: bool = False +FEDERATION_OPERATIONAL_REASON: str = "" +# Tests may set via override_settings without running probes. +FEDERATION_OPERATIONAL_OVERRIDE: bool | None = None +# Export API: allow only these source CIDRs (sync container / internal mesh). +FEDERATION_EXPORT_ALLOWED_CIDRS: list[str] = env.list( + "FEDERATION_EXPORT_ALLOWED_CIDRS", + default=[ + "127.0.0.1/32", + "::1/128", + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + ], +) +FEDERATION_EXPORT_TRUST_X_FORWARDED_FOR: bool = env.bool( + "FEDERATION_EXPORT_TRUST_X_FORWARDED_FOR", + default=False, +) +# Optional: Traefik injects this on internal routes only (empty = IP check only). +FEDERATION_EXPORT_INTERNAL_HEADER_NAME: str = env.str( + "FEDERATION_EXPORT_INTERNAL_HEADER_NAME", + default="X-SDS-Federation-Internal", +) +FEDERATION_EXPORT_INTERNAL_HEADER_SECRET: str = env.str( + "FEDERATION_EXPORT_INTERNAL_HEADER_SECRET", + default="", +) # ADMIN_CONSOLE_ENV is used to visually distinguish between different environments # (production, staging, local) in the admin console and error emails. It does not affect diff --git a/gateway/sds_gateway/api_methods/apps.py b/gateway/sds_gateway/api_methods/apps.py index 2c38194ff..442f6f003 100644 --- a/gateway/sds_gateway/api_methods/apps.py +++ b/gateway/sds_gateway/api_methods/apps.py @@ -14,6 +14,11 @@ class ApiMethodsConfig(AppConfig): def ready(self) -> None: import sds_gateway.api_methods.federation.signals # noqa: F401 import sds_gateway.api_methods.schema # noqa: F401 + from sds_gateway.api_methods.federation.availability import ( + initialize_federation_operational_state, + ) + + initialize_federation_operational_state() silence_unwanted_logs() diff --git a/gateway/sds_gateway/api_methods/federation/availability.py b/gateway/sds_gateway/api_methods/federation/availability.py new file mode 100644 index 000000000..925ef7512 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/availability.py @@ -0,0 +1,175 @@ +"""Federation operational status: config, sync health, Redis, sync API key.""" + +from __future__ import annotations + +import ipaddress +import json +import secrets +import time +import urllib.error +import urllib.request +from typing import Any + +from django.conf import settings +from loguru import logger as log + +from sds_gateway.api_methods.models import KeySources +from sds_gateway.users.models import UserAPIKey + +_RECHECK_INTERVAL_SECONDS = 60.0 +_last_evaluated_at: float = 0.0 +_cached_operational: bool = False +_cached_reason: str = "not evaluated" + + +def _setting(name: str, default: Any = None) -> Any: + return getattr(settings, name, default) + + +def _parse_cidrs(raw: list[str]) -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]: + networks: list[ipaddress.IPv4Network | ipaddress.IPv6Network] = [] + for item in raw: + networks.append(ipaddress.ip_network(item.strip(), strict=False)) + return networks + + +def federation_client_ip(request) -> str | None: + """Resolve client IP for federation export access control.""" + trust_forwarded = _setting("FEDERATION_EXPORT_TRUST_X_FORWARDED_FOR", False) + if trust_forwarded: + forwarded = request.META.get("HTTP_X_FORWARDED_FOR", "") + if forwarded: + return forwarded.split(",")[0].strip() + remote = request.META.get("REMOTE_ADDR") + if remote: + return str(remote).strip() + return None + + +def is_client_ip_allowed_for_federation_export(request) -> bool: + cidrs = _parse_cidrs(_setting("FEDERATION_EXPORT_ALLOWED_CIDRS", [])) + if not cidrs: + return False + client_ip = federation_client_ip(request) + if not client_ip: + return False + try: + addr = ipaddress.ip_address(client_ip) + except ValueError: + return False + return any(addr in network for network in cidrs) + + +def is_federation_internal_header_valid(request) -> bool: + secret = _setting("FEDERATION_EXPORT_INTERNAL_HEADER_SECRET", "") + if not secret: + return True + header_name = _setting( + "FEDERATION_EXPORT_INTERNAL_HEADER_NAME", + "X-SDS-Federation-Internal", + ) + meta_key = "HTTP_" + header_name.upper().replace("-", "_") + provided = request.META.get(meta_key, "") + if not provided: + return False + return secrets.compare_digest(str(provided), str(secret)) + + +def _sync_health_ok() -> tuple[bool, str]: + if _setting("FEDERATION_SKIP_SYNC_HEALTH_PROBE", False): + return True, "health probe skipped" + url = (_setting("FEDERATION_SYNC_HEALTH_URL") or "").strip() + if not url: + return False, "FEDERATION_SYNC_HEALTH_URL is not set" + timeout = float(_setting("FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT", 2.0)) + request = urllib.request.Request(url, method="GET") + try: + with urllib.request.urlopen(request, timeout=timeout) as response: + if response.status != 200: + return False, f"sync health returned HTTP {response.status}" + body = response.read().decode("utf-8", errors="replace") + except urllib.error.URLError as exc: + return False, f"sync health probe failed: {exc.reason}" + except TimeoutError: + return False, "sync health probe timed out" + if body: + try: + payload = json.loads(body) + if isinstance(payload, dict) and payload.get("status") == "ok": + return True, "sync health ok" + except json.JSONDecodeError: + pass + return True, "sync health returned 200" + return True, "sync health returned 200" + + +def _sync_api_key_present() -> tuple[bool, str]: + if _setting("FEDERATION_SKIP_SYNC_API_KEY_CHECK", False): + return True, "sync API key check skipped" + exists = UserAPIKey.objects.filter(source=KeySources.FederationSync).exists() + if not exists: + return False, "no FederationSync API key in database" + return True, "FederationSync API key present" + + +def _redis_ok() -> tuple[bool, str]: + if not _setting("FEDERATION_EVENTS_ENABLED", False): + return True, "redis not required (events disabled)" + if _setting("FEDERATION_SKIP_REDIS_PROBE", False): + return True, "redis probe skipped" + from sds_gateway.api_methods.tasks import get_redis_client + + try: + client = get_redis_client() + client.ping() + except Exception as exc: # noqa: BLE001 + return False, f"redis ping failed: {exc}" + return True, "redis ok" + + +def evaluate_federation_operational() -> tuple[bool, str]: + if not _setting("FEDERATION_ENABLED", False): + return False, "FEDERATION_ENABLED is False" + + for check in (_sync_api_key_present, _sync_health_ok, _redis_ok): + ok, reason = check() + if not ok: + return False, reason + return True, "federation operational" + + +def refresh_federation_operational_state(*, force: bool = False) -> tuple[bool, str]: + global _cached_operational, _cached_reason, _last_evaluated_at + + now = time.monotonic() + if ( + not force + and _last_evaluated_at + and (now - _last_evaluated_at) < _RECHECK_INTERVAL_SECONDS + ): + return _cached_operational, _cached_reason + + operational, reason = evaluate_federation_operational() + _cached_operational = operational + _cached_reason = reason + _last_evaluated_at = now + settings.FEDERATION_OPERATIONAL = operational + settings.FEDERATION_OPERATIONAL_REASON = reason + return operational, reason + + +def initialize_federation_operational_state() -> None: + operational, reason = refresh_federation_operational_state(force=True) + if operational: + log.info("Federation is operational: {}", reason) + else: + log.warning("Federation disabled: {}", reason) + + +def is_federation_operational() -> bool: + if _setting("FEDERATION_OPERATIONAL_OVERRIDE", None) is not None: + return bool(_setting("FEDERATION_OPERATIONAL_OVERRIDE")) + if not _setting("FEDERATION_ENABLED", False): + return False + operational, _reason = refresh_federation_operational_state() + return operational diff --git a/gateway/sds_gateway/api_methods/federation/events.py b/gateway/sds_gateway/api_methods/federation/events.py index f1d8ff897..7e90af3a8 100644 --- a/gateway/sds_gateway/api_methods/federation/events.py +++ b/gateway/sds_gateway/api_methods/federation/events.py @@ -11,6 +11,7 @@ from django.conf import settings from loguru import logger as log +from sds_gateway.api_methods.federation.availability import is_federation_operational from sds_gateway.api_methods.models import ItemType from sds_gateway.api_methods.tasks import get_redis_client @@ -25,6 +26,9 @@ def publish_federation_event( timestamp: datetime | None = None, ) -> None: """Notify the local federation sync service via Redis pub/sub.""" + if not is_federation_operational(): + log.debug("Federation not operational, skipping Redis publish") + return channel = getattr(settings, "FEDERATION_EVENTS_CHANNEL", "federation:events") payload: dict[str, Any] = { "event_type": event_type, diff --git a/gateway/sds_gateway/api_methods/federation/permissions.py b/gateway/sds_gateway/api_methods/federation/permissions.py new file mode 100644 index 000000000..61f2ecad8 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/permissions.py @@ -0,0 +1,44 @@ +"""Permissions specific to federation export endpoints.""" + +from django.conf import settings +from rest_framework.exceptions import APIException +from rest_framework.permissions import BasePermission + +from sds_gateway.api_methods.federation.availability import ( + is_client_ip_allowed_for_federation_export, +) +from sds_gateway.api_methods.federation.availability import ( + is_federation_internal_header_valid, +) +from sds_gateway.api_methods.federation.availability import is_federation_operational + + +class FederationNotOperational(APIException): + status_code = 503 + default_detail = "Federation is not configured or the sync service is unavailable." + default_code = "federation_unavailable" + + +class IsFederationOperational(BasePermission): + """Deny export when federation failed startup / health checks.""" + + message = FederationNotOperational.default_detail + + def has_permission(self, request, view) -> bool: + if not is_federation_operational(): + detail = getattr(settings, "FEDERATION_OPERATIONAL_REASON", "") or ( + FederationNotOperational.default_detail + ) + raise FederationNotOperational(detail=detail) + return True + + +class IsFederationInternalExportClient(BasePermission): + """Restrict export to internal networks (and optional edge-injected header).""" + + message = "Federation export is only available to internal clients." + + def has_permission(self, request, view) -> bool: + if not is_client_ip_allowed_for_federation_export(request): + return False + return is_federation_internal_header_valid(request) diff --git a/gateway/sds_gateway/api_methods/federation/signals.py b/gateway/sds_gateway/api_methods/federation/signals.py index c65619971..d235aae58 100644 --- a/gateway/sds_gateway/api_methods/federation/signals.py +++ b/gateway/sds_gateway/api_methods/federation/signals.py @@ -7,6 +7,7 @@ from django.dispatch import receiver from loguru import logger as log +from sds_gateway.api_methods.federation.availability import is_federation_operational from sds_gateway.api_methods.federation.events import publish_federation_event from sds_gateway.api_methods.helpers.compile_federated_data import ( is_federation_exportable_capture, @@ -24,6 +25,19 @@ def _event_type(*, created: bool, exportable: bool) -> str: return "deleted" return "created" if created else "updated" +def _skip_signal() -> bool: + if not getattr(settings, "FEDERATION_EVENTS_ENABLED", False): + log.debug( + "FEDERATION_EVENTS_ENABLED is False, " + "skipping federation signal", + ) + return True + + if not is_federation_operational(): + log.debug("Federation not operational, skipping signal") + return True + + return False @receiver(post_save, sender=Dataset) def federation_dataset_changed( @@ -32,11 +46,7 @@ def federation_dataset_changed( created: bool, **kwargs, ) -> None: - if not getattr(settings, "FEDERATION_EVENTS_ENABLED", True): - log.debug( - "FEDERATION_EVENTS_ENABLED is False, " - "skipping federation dataset changed signal", - ) + if _skip_signal(): return exportable = is_federation_exportable_dataset(instance) @@ -55,11 +65,7 @@ def federation_capture_changed( created: bool, **kwargs, ) -> None: - if not getattr(settings, "FEDERATION_EVENTS_ENABLED", True): - log.debug( - "FEDERATION_EVENTS_ENABLED is False, " - "skipping federation capture changed signal", - ) + if _skip_signal(): return exportable = is_federation_exportable_capture(instance) diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_export.py b/gateway/sds_gateway/api_methods/tests/test_federation_export.py index 1c89736a8..7d68dae5f 100644 --- a/gateway/sds_gateway/api_methods/tests/test_federation_export.py +++ b/gateway/sds_gateway/api_methods/tests/test_federation_export.py @@ -1,6 +1,7 @@ """Tests for federation export endpoints and API key scoping.""" from django.contrib.auth import get_user_model +from django.test import override_settings from django.urls import reverse from rest_framework import status from rest_framework.test import APITestCase @@ -14,6 +15,12 @@ User = get_user_model() +@override_settings( + FEDERATION_ENABLED=True, + FEDERATION_OPERATIONAL_OVERRIDE=True, + FEDERATION_EXPORT_ALLOWED_CIDRS=["0.0.0.0/0", "::/0"], + FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="", +) class FederationExportAPITest(APITestCase): def setUp(self) -> None: self.owner = User.objects.create(email="owner@example.com", is_approved=True) @@ -59,7 +66,11 @@ def _auth(self, key: str) -> dict[str, str]: return {"HTTP_AUTHORIZATION": f"Api-Key: {key}"} def test_sync_key_can_list_public_datasets(self) -> None: - response = self.client.get(self.list_datasets_url, **self._auth(self.sync_key)) + response = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.sync_key), + ) assert response.status_code == status.HTTP_200_OK uuids = {row["uuid"] for row in response.json()} assert str(self.public_dataset.uuid) in uuids @@ -68,6 +79,7 @@ def test_sync_key_can_list_public_datasets(self) -> None: def test_sync_key_can_retrieve_public_dataset(self) -> None: response = self.client.get( self.detail_dataset_url, + REMOTE_ADDR="127.0.0.1", **self._auth(self.sync_key), ) assert response.status_code == status.HTTP_200_OK @@ -75,7 +87,11 @@ def test_sync_key_can_retrieve_public_dataset(self) -> None: assert response.json()["site_name"] def test_regular_key_denied_on_export(self) -> None: - response = self.client.get(self.list_datasets_url, **self._auth(self.user_key)) + response = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.user_key), + ) assert response.status_code == status.HTTP_403_FORBIDDEN def test_sync_key_denied_on_dataset_assets_api(self) -> None: @@ -92,7 +108,11 @@ def test_sync_key_denied_on_capture_list(self) -> None: assert response.status_code == status.HTTP_403_FORBIDDEN def test_export_captures_list(self) -> None: - response = self.client.get(self.list_captures_url, **self._auth(self.sync_key)) + response = self.client.get( + self.list_captures_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.sync_key), + ) assert response.status_code == status.HTTP_200_OK uuids = {row["uuid"] for row in response.json()} assert str(self.public_capture.uuid) in uuids diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py b/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py new file mode 100644 index 000000000..bc2c97b14 --- /dev/null +++ b/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py @@ -0,0 +1,205 @@ +"""Tests for federation operational checks and export access control.""" + +from __future__ import annotations + +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from django.test import RequestFactory +from django.test import override_settings +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase + +from sds_gateway.api_methods.federation.availability import evaluate_federation_operational +from sds_gateway.api_methods.federation.availability import ( + is_client_ip_allowed_for_federation_export, +) +from sds_gateway.api_methods.federation.availability import ( + is_federation_internal_header_valid, +) +from sds_gateway.api_methods.federation.availability import refresh_federation_operational_state +from sds_gateway.api_methods.models import DatasetStatus +from sds_gateway.api_methods.models import KeySources +from sds_gateway.api_methods.tests.factories import DatasetFactory +from sds_gateway.users.models import UserAPIKey + +pytestmark = pytest.mark.django_db + + +class TestFederationAvailability: + @override_settings(FEDERATION_ENABLED=False) + def test_disabled_when_master_switch_off(self) -> None: + ok, reason = evaluate_federation_operational() + assert ok is False + assert "FEDERATION_ENABLED" in reason + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, + FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, + FEDERATION_SKIP_REDIS_PROBE=True, + ) + def test_operational_when_probes_skipped(self) -> None: + ok, reason = evaluate_federation_operational() + assert ok is True + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, + FEDERATION_SKIP_REDIS_PROBE=True, + ) + def test_fails_without_sync_api_key(self) -> None: + ok, reason = evaluate_federation_operational() + assert ok is False + assert "FederationSync" in reason + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, + FEDERATION_SYNC_HEALTH_URL="http://sync.test/health", + FEDERATION_SKIP_REDIS_PROBE=True, + ) + @patch("sds_gateway.api_methods.federation.availability.urllib.request.urlopen") + def test_health_probe_success(self, mock_urlopen: MagicMock) -> None: + response = MagicMock() + response.status = 200 + response.read.return_value = b'{"status":"ok"}' + response.__enter__.return_value = response + response.__exit__.return_value = None + mock_urlopen.return_value = response + + ok, _reason = evaluate_federation_operational() + assert ok is True + + @override_settings( + FEDERATION_EXPORT_ALLOWED_CIDRS=["10.0.0.0/8"], + ) + def test_client_ip_allowlist(self) -> None: + factory = RequestFactory() + allowed = factory.get("/", REMOTE_ADDR="10.1.2.3") + denied = factory.get("/", REMOTE_ADDR="203.0.113.8") + assert is_client_ip_allowed_for_federation_export(allowed) is True + assert is_client_ip_allowed_for_federation_export(denied) is False + + @override_settings( + FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="top-secret", + FEDERATION_EXPORT_INTERNAL_HEADER_NAME="X-SDS-Federation-Internal", + ) + def test_internal_header_when_configured(self) -> None: + factory = RequestFactory() + missing = factory.get("/") + valid = factory.get("/", HTTP_X_SDS_FEDERATION_INTERNAL="top-secret") + invalid = factory.get("/", HTTP_X_SDS_FEDERATION_INTERNAL="wrong") + assert is_federation_internal_header_valid(missing) is False + assert is_federation_internal_header_valid(valid) is True + assert is_federation_internal_header_valid(invalid) is False + + @override_settings(FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="") + def test_internal_header_optional_when_secret_empty(self) -> None: + request = RequestFactory().get("/") + assert is_federation_internal_header_valid(request) is True + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, + FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, + FEDERATION_SKIP_REDIS_PROBE=True, + ) + def test_refresh_sets_settings_flags(self) -> None: + operational, reason = refresh_federation_operational_state(force=True) + from django.conf import settings + + assert operational is True + assert settings.FEDERATION_OPERATIONAL is True + assert reason + + +@override_settings( + FEDERATION_ENABLED=True, + FEDERATION_OPERATIONAL_OVERRIDE=True, + FEDERATION_EXPORT_ALLOWED_CIDRS=["0.0.0.0/0", "::/0"], + FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="", +) +class FederationExportAccessControlTest(APITestCase): + """Export API with operational + network permissions enabled for tests.""" + + def setUp(self) -> None: + from django.contrib.auth import get_user_model + + User = get_user_model() + self.sync_user = User.objects.create( + email="sync@internal.local", + is_approved=True, + ) + _obj, self.sync_key = UserAPIKey.objects.create_key( + name="sync", + user=self.sync_user, + source=KeySources.FederationSync, + ) + owner = User.objects.create(email="owner@example.com", is_approved=True) + self.public_dataset = DatasetFactory( + owner=owner, + is_public=True, + status=DatasetStatus.FINAL, + keywords=None, + ) + self.list_datasets_url = reverse("api:federation-export-datasets-list") + + def _auth(self, key: str) -> dict[str, str]: + return {"HTTP_AUTHORIZATION": f"Api-Key: {key}"} + + def test_sync_key_allowed_from_loopback(self) -> None: + response = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.sync_key), + ) + assert response.status_code == status.HTTP_200_OK + + @override_settings( + FEDERATION_EXPORT_ALLOWED_CIDRS=["10.0.0.0/8"], + FEDERATION_OPERATIONAL_OVERRIDE=True, + ) + def test_sync_key_denied_from_public_ip(self) -> None: + response = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="203.0.113.1", + **self._auth(self.sync_key), + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + @override_settings( + FEDERATION_OPERATIONAL_OVERRIDE=False, + FEDERATION_OPERATIONAL_REASON="sync health probe failed", + FEDERATION_EXPORT_ALLOWED_CIDRS=["0.0.0.0/0"], + ) + def test_export_returns_503_when_not_operational(self) -> None: + response = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.sync_key), + ) + assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE + + @override_settings( + FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="edge-secret", + FEDERATION_OPERATIONAL_OVERRIDE=True, + FEDERATION_EXPORT_ALLOWED_CIDRS=["127.0.0.1/32"], + ) + def test_internal_header_required_when_configured(self) -> None: + denied = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="127.0.0.1", + **self._auth(self.sync_key), + ) + assert denied.status_code == status.HTTP_403_FORBIDDEN + + allowed = self.client.get( + self.list_datasets_url, + REMOTE_ADDR="127.0.0.1", + HTTP_X_SDS_FEDERATION_INTERNAL="edge-secret", + **self._auth(self.sync_key), + ) + assert allowed.status_code == status.HTTP_200_OK diff --git a/gateway/sds_gateway/api_methods/views/federation_endpoints.py b/gateway/sds_gateway/api_methods/views/federation_endpoints.py index 2f9c06016..087f3472d 100644 --- a/gateway/sds_gateway/api_methods/views/federation_endpoints.py +++ b/gateway/sds_gateway/api_methods/views/federation_endpoints.py @@ -30,6 +30,8 @@ ) from sds_gateway.api_methods.models import Capture from sds_gateway.api_methods.models import Dataset +from sds_gateway.api_methods.federation.permissions import IsFederationInternalExportClient +from sds_gateway.api_methods.federation.permissions import IsFederationOperational from sds_gateway.api_methods.permissions import IsFederationSyncKey @@ -38,7 +40,11 @@ class FederationViewSet(ViewSet): """Internal export endpoints for the federation sync service.""" authentication_classes = [APIKeyAuthentication] - permission_classes = [IsFederationSyncKey] + permission_classes = [ + IsFederationSyncKey, + IsFederationOperational, + IsFederationInternalExportClient, + ] @action(detail=False, methods=["get"], url_path="export/datasets") def export_datasets_list(self, request: Request) -> Response: From 9e3fc32191c912fe0f4567482b7b42bd8b588691 Mon Sep 17 00:00:00 2001 From: klpoland Date: Fri, 19 Jun 2026 13:48:04 -0400 Subject: [PATCH 3/8] pre-commit fixes --- gateway/config/api_router.py | 2 +- gateway/pyproject.toml | 4 ++ gateway/scripts/fallow-cross-file-dupes.sh | 33 +++++++----- gateway/sds_gateway/api_methods/apps.py | 2 +- .../api_methods/federation/availability.py | 51 ++++++++++--------- .../api_methods/federation/events.py | 8 ++- .../api_methods/federation/export_contract.py | 9 +++- .../api_methods/federation/signals.py | 11 ++-- .../helpers/compile_federated_data.py | 11 ++-- .../sds_gateway/api_methods/permissions.py | 6 +-- .../serializers/capture_serializers.py | 3 +- .../tests/test_federation_export_contract.py | 8 +-- .../tests/test_federation_hardening.py | 22 ++++---- .../api_methods/views/federation_endpoints.py | 20 ++++++-- 14 files changed, 114 insertions(+), 76 deletions(-) diff --git a/gateway/config/api_router.py b/gateway/config/api_router.py index 976ff5023..b181398ea 100644 --- a/gateway/config/api_router.py +++ b/gateway/config/api_router.py @@ -5,9 +5,9 @@ from sds_gateway.api_methods.views.auth_endpoints import ValidateAuthViewSet from sds_gateway.api_methods.views.capture_endpoints import CaptureViewSet from sds_gateway.api_methods.views.dataset_endpoints import DatasetViewSet +from sds_gateway.api_methods.views.federation_endpoints import FederationViewSet from sds_gateway.api_methods.views.file_endpoints import FileViewSet from sds_gateway.api_methods.views.file_endpoints import check_contents_exist -from sds_gateway.api_methods.views.federation_endpoints import FederationViewSet from sds_gateway.users.api.views import UserViewSet from sds_gateway.visualizations.api_views import VisualizationViewSet diff --git a/gateway/pyproject.toml b/gateway/pyproject.toml index 4ed42b8b8..a88f1bdee 100644 --- a/gateway/pyproject.toml +++ b/gateway/pyproject.toml @@ -147,6 +147,10 @@ # https://deptry.com/usage/#per-rule-ignores [tool.deptry.per_rule_ignores] + DEP001 = [ + # optional monorepo sibling; contract tests add federation/ to sys.path + "sds_federation", + ] DEP002 = [ # packages that are installed but not imported "argon2-cffi", # used by django for argon2 password hashing diff --git a/gateway/scripts/fallow-cross-file-dupes.sh b/gateway/scripts/fallow-cross-file-dupes.sh index 5d2e91c14..c1ac45594 100755 --- a/gateway/scripts/fallow-cross-file-dupes.sh +++ b/gateway/scripts/fallow-cross-file-dupes.sh @@ -2,24 +2,31 @@ set -euo pipefail cd "$(dirname "$0")/.." -# Use first available: vpx > bunx > npx -if command -v pnpx &>/dev/null; then - RUNNER=pnpx -elif command -v vpx &>/dev/null; then - RUNNER=vpx -elif command -v bunx &>/dev/null; then - RUNNER=bunx -elif command -v npx &>/dev/null; then - RUNNER=npx +LOCAL_FALLOW="node_modules/.bin/fallow" +if [[ -x "${LOCAL_FALLOW}" ]]; then + "${LOCAL_FALLOW}" dupes --format json -q | jq -e ' + [ (.clone_groups // .dupes.clone_groups // [])[] + | select((.instances | map(.file) | unique | length) > 1) + ] | length == 0 +' >/dev/null else - echo "Error: neither vpx, bunx, nor npx found in PATH" >&2 - exit 1 -fi + # Use first available: vpx > bunx > npx (avoid pnpx; global pnpm may need newer Node) + if command -v vpx &>/dev/null; then + RUNNER=(vpx) + elif command -v bunx &>/dev/null; then + RUNNER=(bunx) + elif command -v npx &>/dev/null; then + RUNNER=(npx) + else + echo "Error: neither local fallow, vpx, bunx, nor npx found" >&2 + exit 1 + fi -"${RUNNER}" fallow dupes --format json -q | jq -e ' + "${RUNNER[@]}" fallow dupes --format json -q | jq -e ' [ (.clone_groups // .dupes.clone_groups // [])[] | select((.instances | map(.file) | unique | length) > 1) ] | length == 0 ' >/dev/null +fi echo "No cross-file clone groups detected." diff --git a/gateway/sds_gateway/api_methods/apps.py b/gateway/sds_gateway/api_methods/apps.py index 442f6f003..3d189bbef 100644 --- a/gateway/sds_gateway/api_methods/apps.py +++ b/gateway/sds_gateway/api_methods/apps.py @@ -12,7 +12,7 @@ class ApiMethodsConfig(AppConfig): # pattern to import application modules here in ready() # ruff: noqa: PLC0415 def ready(self) -> None: - import sds_gateway.api_methods.federation.signals # noqa: F401 + import sds_gateway.api_methods.federation.signals import sds_gateway.api_methods.schema # noqa: F401 from sds_gateway.api_methods.federation.availability import ( initialize_federation_operational_state, diff --git a/gateway/sds_gateway/api_methods/federation/availability.py b/gateway/sds_gateway/api_methods/federation/availability.py index 925ef7512..0196caf48 100644 --- a/gateway/sds_gateway/api_methods/federation/availability.py +++ b/gateway/sds_gateway/api_methods/federation/availability.py @@ -14,30 +14,32 @@ from loguru import logger as log from sds_gateway.api_methods.models import KeySources +from sds_gateway.api_methods.tasks import get_redis_client from sds_gateway.users.models import UserAPIKey +_HTTP_OK = 200 _RECHECK_INTERVAL_SECONDS = 60.0 _last_evaluated_at: float = 0.0 _cached_operational: bool = False _cached_reason: str = "not evaluated" -def _setting(name: str, default: Any = None) -> Any: +def _setting(name: str, *, default: Any = None) -> Any: return getattr(settings, name, default) def _parse_cidrs(raw: list[str]) -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]: - networks: list[ipaddress.IPv4Network | ipaddress.IPv6Network] = [] - for item in raw: - networks.append(ipaddress.ip_network(item.strip(), strict=False)) - return networks + return [ipaddress.ip_network(item.strip(), strict=False) for item in raw] def federation_client_ip(request) -> str | None: """Resolve client IP for federation export access control.""" - trust_forwarded = _setting("FEDERATION_EXPORT_TRUST_X_FORWARDED_FOR", False) + trust_forwarded = _setting( + "FEDERATION_EXPORT_TRUST_X_FORWARDED_FOR", + default=False, + ) if trust_forwarded: - forwarded = request.META.get("HTTP_X_FORWARDED_FOR", "") + forwarded = request.headers.get("x-forwarded-for", "") if forwarded: return forwarded.split(",")[0].strip() remote = request.META.get("REMOTE_ADDR") @@ -47,7 +49,7 @@ def federation_client_ip(request) -> str | None: def is_client_ip_allowed_for_federation_export(request) -> bool: - cidrs = _parse_cidrs(_setting("FEDERATION_EXPORT_ALLOWED_CIDRS", [])) + cidrs = _parse_cidrs(_setting("FEDERATION_EXPORT_ALLOWED_CIDRS", default=[])) if not cidrs: return False client_ip = federation_client_ip(request) @@ -61,7 +63,7 @@ def is_client_ip_allowed_for_federation_export(request) -> bool: def is_federation_internal_header_valid(request) -> bool: - secret = _setting("FEDERATION_EXPORT_INTERNAL_HEADER_SECRET", "") + secret = _setting("FEDERATION_EXPORT_INTERNAL_HEADER_SECRET", default="") if not secret: return True header_name = _setting( @@ -75,17 +77,21 @@ def is_federation_internal_header_valid(request) -> bool: return secrets.compare_digest(str(provided), str(secret)) -def _sync_health_ok() -> tuple[bool, str]: - if _setting("FEDERATION_SKIP_SYNC_HEALTH_PROBE", False): +def _sync_health_ok() -> tuple[bool, str]: # noqa: PLR0911 + if _setting("FEDERATION_SKIP_SYNC_HEALTH_PROBE", default=False): return True, "health probe skipped" url = (_setting("FEDERATION_SYNC_HEALTH_URL") or "").strip() if not url: return False, "FEDERATION_SYNC_HEALTH_URL is not set" - timeout = float(_setting("FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT", 2.0)) - request = urllib.request.Request(url, method="GET") + if not url.startswith(("http://", "https://")): + return False, "FEDERATION_SYNC_HEALTH_URL must be http(s)" + timeout = float( + _setting("FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT", default=2.0), + ) + request = urllib.request.Request(url, method="GET") # noqa: S310 try: - with urllib.request.urlopen(request, timeout=timeout) as response: - if response.status != 200: + with urllib.request.urlopen(request, timeout=timeout) as response: # noqa: S310 + if response.status != _HTTP_OK: return False, f"sync health returned HTTP {response.status}" body = response.read().decode("utf-8", errors="replace") except urllib.error.URLError as exc: @@ -104,7 +110,7 @@ def _sync_health_ok() -> tuple[bool, str]: def _sync_api_key_present() -> tuple[bool, str]: - if _setting("FEDERATION_SKIP_SYNC_API_KEY_CHECK", False): + if _setting("FEDERATION_SKIP_SYNC_API_KEY_CHECK", default=False): return True, "sync API key check skipped" exists = UserAPIKey.objects.filter(source=KeySources.FederationSync).exists() if not exists: @@ -113,11 +119,10 @@ def _sync_api_key_present() -> tuple[bool, str]: def _redis_ok() -> tuple[bool, str]: - if not _setting("FEDERATION_EVENTS_ENABLED", False): + if not _setting("FEDERATION_EVENTS_ENABLED", default=False): return True, "redis not required (events disabled)" - if _setting("FEDERATION_SKIP_REDIS_PROBE", False): + if _setting("FEDERATION_SKIP_REDIS_PROBE", default=False): return True, "redis probe skipped" - from sds_gateway.api_methods.tasks import get_redis_client try: client = get_redis_client() @@ -128,7 +133,7 @@ def _redis_ok() -> tuple[bool, str]: def evaluate_federation_operational() -> tuple[bool, str]: - if not _setting("FEDERATION_ENABLED", False): + if not _setting("FEDERATION_ENABLED", default=False): return False, "FEDERATION_ENABLED is False" for check in (_sync_api_key_present, _sync_health_ok, _redis_ok): @@ -139,7 +144,7 @@ def evaluate_federation_operational() -> tuple[bool, str]: def refresh_federation_operational_state(*, force: bool = False) -> tuple[bool, str]: - global _cached_operational, _cached_reason, _last_evaluated_at + global _cached_operational, _cached_reason, _last_evaluated_at # noqa: PLW0603 now = time.monotonic() if ( @@ -167,9 +172,9 @@ def initialize_federation_operational_state() -> None: def is_federation_operational() -> bool: - if _setting("FEDERATION_OPERATIONAL_OVERRIDE", None) is not None: + if _setting("FEDERATION_OPERATIONAL_OVERRIDE", default=None) is not None: return bool(_setting("FEDERATION_OPERATIONAL_OVERRIDE")) - if not _setting("FEDERATION_ENABLED", False): + if not _setting("FEDERATION_ENABLED", default=False): return False operational, _reason = refresh_federation_operational_state() return operational diff --git a/gateway/sds_gateway/api_methods/federation/events.py b/gateway/sds_gateway/api_methods/federation/events.py index 7e90af3a8..82554387f 100644 --- a/gateway/sds_gateway/api_methods/federation/events.py +++ b/gateway/sds_gateway/api_methods/federation/events.py @@ -5,16 +5,20 @@ import json from datetime import UTC from datetime import datetime +from typing import TYPE_CHECKING from typing import Any -from uuid import UUID from django.conf import settings from loguru import logger as log from sds_gateway.api_methods.federation.availability import is_federation_operational -from sds_gateway.api_methods.models import ItemType from sds_gateway.api_methods.tasks import get_redis_client +if TYPE_CHECKING: + from uuid import UUID + + from sds_gateway.api_methods.models import ItemType + FederationEventType = str # created | updated | deleted diff --git a/gateway/sds_gateway/api_methods/federation/export_contract.py b/gateway/sds_gateway/api_methods/federation/export_contract.py index 7d9931f67..0da4b7f90 100644 --- a/gateway/sds_gateway/api_methods/federation/export_contract.py +++ b/gateway/sds_gateway/api_methods/federation/export_contract.py @@ -2,9 +2,11 @@ from __future__ import annotations +from typing import TYPE_CHECKING from typing import Any -from rest_framework.serializers import BaseSerializer +if TYPE_CHECKING: + from rest_framework.serializers import BaseSerializer def serializer_output_field_names(serializer: BaseSerializer[Any]) -> set[str]: @@ -22,5 +24,8 @@ def assert_field_names_match( if expected != actual: missing = expected - actual extra = actual - expected - msg = f"{label} field mismatch: missing={sorted(missing)!r} extra={sorted(extra)!r}" + msg = ( + f"{label} field mismatch: missing={sorted(missing)!r} " + f"extra={sorted(extra)!r}" + ) raise AssertionError(msg) diff --git a/gateway/sds_gateway/api_methods/federation/signals.py b/gateway/sds_gateway/api_methods/federation/signals.py index d235aae58..fa30951a5 100644 --- a/gateway/sds_gateway/api_methods/federation/signals.py +++ b/gateway/sds_gateway/api_methods/federation/signals.py @@ -25,25 +25,26 @@ def _event_type(*, created: bool, exportable: bool) -> str: return "deleted" return "created" if created else "updated" + def _skip_signal() -> bool: if not getattr(settings, "FEDERATION_EVENTS_ENABLED", False): log.debug( - "FEDERATION_EVENTS_ENABLED is False, " - "skipping federation signal", + "FEDERATION_EVENTS_ENABLED is False, skipping federation signal", ) return True - + if not is_federation_operational(): log.debug("Federation not operational, skipping signal") return True return False + @receiver(post_save, sender=Dataset) def federation_dataset_changed( sender: type[Dataset], instance: Dataset, - created: bool, + created: bool, # noqa: FBT001 **kwargs, ) -> None: if _skip_signal(): @@ -62,7 +63,7 @@ def federation_dataset_changed( def federation_capture_changed( sender: type[Capture], instance: Capture, - created: bool, + created: bool, # noqa: FBT001 **kwargs, ) -> None: if _skip_signal(): diff --git a/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py b/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py index 48db49377..365b2c201 100644 --- a/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py +++ b/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py @@ -10,22 +10,21 @@ from sds_gateway.api_methods.models import Capture from sds_gateway.api_methods.models import Dataset from sds_gateway.api_methods.models import DatasetStatus -from sds_gateway.api_methods.serializers.dataset_serializers import ( - get_dataset_serializer, +from sds_gateway.api_methods.serializers.capture_serializers import ( + CaptureFederationSerializer, ) from sds_gateway.api_methods.serializers.dataset_serializers import ( DatasetFederationSerializer, ) -from sds_gateway.api_methods.serializers.capture_serializers import ( - CaptureFederationSerializer, -) if TYPE_CHECKING: from django.db.models import QuerySet def federation_site_name() -> str: - return getattr(settings, "FEDERATION_SITE_NAME", settings.SDS_PROGRAMMATIC_SITE_NAME) + return getattr( + settings, "FEDERATION_SITE_NAME", settings.SDS_PROGRAMMATIC_SITE_NAME + ) def public_datasets_queryset() -> QuerySet[Dataset]: diff --git a/gateway/sds_gateway/api_methods/permissions.py b/gateway/sds_gateway/api_methods/permissions.py index 2af3b8929..2fe7bf2eb 100644 --- a/gateway/sds_gateway/api_methods/permissions.py +++ b/gateway/sds_gateway/api_methods/permissions.py @@ -26,6 +26,6 @@ class DisallowFederationSyncKey(BasePermission): def has_permission(self, request, view) -> bool: key = request.auth - if isinstance(key, UserAPIKey) and key.source == KeySources.FederationSync: - return False - return True + return not ( + isinstance(key, UserAPIKey) and key.source == KeySources.FederationSync + ) diff --git a/gateway/sds_gateway/api_methods/serializers/capture_serializers.py b/gateway/sds_gateway/api_methods/serializers/capture_serializers.py index 56156b777..168178013 100644 --- a/gateway/sds_gateway/api_methods/serializers/capture_serializers.py +++ b/gateway/sds_gateway/api_methods/serializers/capture_serializers.py @@ -12,7 +12,6 @@ from rest_framework.utils.serializer_helpers import ReturnList from sds_gateway.api_methods.helpers.index_handling import retrieve_indexed_metadata -from sds_gateway.api_methods.utils.relationship_utils import get_capture_datasets from sds_gateway.api_methods.models import Capture from sds_gateway.api_methods.models import CaptureType from sds_gateway.api_methods.models import DEPRECATEDPostProcessedData @@ -935,4 +934,4 @@ def get_capture_props(self, obj: Capture) -> dict[str, Any]: def get_dataset_ids(self, obj: Capture) -> list[str]: qs = get_capture_datasets(obj, include_deleted=False) - return [str(dataset.uuid) for dataset in qs] \ No newline at end of file + return [str(dataset.uuid) for dataset in qs] diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py b/gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py index 0c86277d1..910e0731c 100644 --- a/gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py +++ b/gateway/sds_gateway/api_methods/tests/test_federation_export_contract.py @@ -11,6 +11,8 @@ from sds_gateway.api_methods.federation.export_contract import assert_field_names_match from sds_gateway.api_methods.helpers.compile_federated_data import ( compile_federated_capture_doc, +) +from sds_gateway.api_methods.helpers.compile_federated_data import ( compile_federated_dataset_doc, ) from sds_gateway.api_methods.models import DatasetStatus @@ -30,10 +32,8 @@ pytest.importorskip("sds_federation") -from sds_federation.schemas.webhooks import ( # noqa: E402 - FederatedCaptureDoc, - FederatedDatasetDoc, -) +from sds_federation.schemas.webhooks import FederatedCaptureDoc # noqa: E402 +from sds_federation.schemas.webhooks import FederatedDatasetDoc # noqa: E402 User = get_user_model() diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py b/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py index bc2c97b14..7a20f8ce1 100644 --- a/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py +++ b/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py @@ -6,25 +6,33 @@ from unittest.mock import patch import pytest +from django.conf import settings +from django.contrib.auth import get_user_model from django.test import RequestFactory from django.test import override_settings from django.urls import reverse from rest_framework import status from rest_framework.test import APITestCase -from sds_gateway.api_methods.federation.availability import evaluate_federation_operational +from sds_gateway.api_methods.federation.availability import ( + evaluate_federation_operational, +) from sds_gateway.api_methods.federation.availability import ( is_client_ip_allowed_for_federation_export, ) from sds_gateway.api_methods.federation.availability import ( is_federation_internal_header_valid, ) -from sds_gateway.api_methods.federation.availability import refresh_federation_operational_state +from sds_gateway.api_methods.federation.availability import ( + refresh_federation_operational_state, +) from sds_gateway.api_methods.models import DatasetStatus from sds_gateway.api_methods.models import KeySources from sds_gateway.api_methods.tests.factories import DatasetFactory from sds_gateway.users.models import UserAPIKey +User = get_user_model() + pytestmark = pytest.mark.django_db @@ -42,7 +50,7 @@ def test_disabled_when_master_switch_off(self) -> None: FEDERATION_SKIP_REDIS_PROBE=True, ) def test_operational_when_probes_skipped(self) -> None: - ok, reason = evaluate_federation_operational() + ok, _reason = evaluate_federation_operational() assert ok is True @override_settings( @@ -84,7 +92,7 @@ def test_client_ip_allowlist(self) -> None: assert is_client_ip_allowed_for_federation_export(denied) is False @override_settings( - FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="top-secret", + FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="top-secret", # noqa: S106 FEDERATION_EXPORT_INTERNAL_HEADER_NAME="X-SDS-Federation-Internal", ) def test_internal_header_when_configured(self) -> None: @@ -109,7 +117,6 @@ def test_internal_header_optional_when_secret_empty(self) -> None: ) def test_refresh_sets_settings_flags(self) -> None: operational, reason = refresh_federation_operational_state(force=True) - from django.conf import settings assert operational is True assert settings.FEDERATION_OPERATIONAL is True @@ -126,9 +133,6 @@ class FederationExportAccessControlTest(APITestCase): """Export API with operational + network permissions enabled for tests.""" def setUp(self) -> None: - from django.contrib.auth import get_user_model - - User = get_user_model() self.sync_user = User.objects.create( email="sync@internal.local", is_approved=True, @@ -184,7 +188,7 @@ def test_export_returns_503_when_not_operational(self) -> None: assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE @override_settings( - FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="edge-secret", + FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="edge-secret", # noqa: S106 FEDERATION_OPERATIONAL_OVERRIDE=True, FEDERATION_EXPORT_ALLOWED_CIDRS=["127.0.0.1/32"], ) diff --git a/gateway/sds_gateway/api_methods/views/federation_endpoints.py b/gateway/sds_gateway/api_methods/views/federation_endpoints.py index 087f3472d..d9a215a26 100644 --- a/gateway/sds_gateway/api_methods/views/federation_endpoints.py +++ b/gateway/sds_gateway/api_methods/views/federation_endpoints.py @@ -2,14 +2,22 @@ from __future__ import annotations +from typing import TYPE_CHECKING + from django.shortcuts import get_object_or_404 from drf_spectacular.utils import extend_schema from rest_framework.decorators import action -from rest_framework.request import Request from rest_framework.response import Response from rest_framework.viewsets import ViewSet +if TYPE_CHECKING: + from rest_framework.request import Request + from sds_gateway.api_methods.authentication import APIKeyAuthentication +from sds_gateway.api_methods.federation.permissions import ( + IsFederationInternalExportClient, +) +from sds_gateway.api_methods.federation.permissions import IsFederationOperational from sds_gateway.api_methods.helpers.compile_federated_data import ( compile_federated_capture_doc, ) @@ -30,8 +38,6 @@ ) from sds_gateway.api_methods.models import Capture from sds_gateway.api_methods.models import Dataset -from sds_gateway.api_methods.federation.permissions import IsFederationInternalExportClient -from sds_gateway.api_methods.federation.permissions import IsFederationOperational from sds_gateway.api_methods.permissions import IsFederationSyncKey @@ -59,7 +65,9 @@ def export_datasets_list(self, request: Request) -> Response: methods=["get"], url_path=r"export/datasets/(?P[^/.]+)", ) - def export_dataset_detail(self, request: Request, pk: str | None = None) -> Response: + def export_dataset_detail( + self, request: Request, pk: str | None = None + ) -> Response: """Return one public dataset for sync after a local Redis event.""" dataset = get_object_or_404(Dataset, pk=pk, is_deleted=False) if not is_federation_exportable_dataset(dataset): @@ -82,7 +90,9 @@ def export_captures_list(self, request: Request) -> Response: methods=["get"], url_path=r"export/captures/(?P[^/.]+)", ) - def export_capture_detail(self, request: Request, pk: str | None = None) -> Response: + def export_capture_detail( + self, request: Request, pk: str | None = None + ) -> Response: """Return one public capture for sync after a local Redis event.""" capture = get_object_or_404(Capture, pk=pk, is_deleted=False) if not is_federation_exportable_capture(capture): From 3b3ea8f4d8b34270291c42806097ca975d5b41ee Mon Sep 17 00:00:00 2001 From: klpoland Date: Wed, 24 Jun 2026 16:14:00 -0400 Subject: [PATCH 4/8] arg to kwarg --- gateway/sds_gateway/api_methods/federation/availability.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway/sds_gateway/api_methods/federation/availability.py b/gateway/sds_gateway/api_methods/federation/availability.py index 0196caf48..718bf10a1 100644 --- a/gateway/sds_gateway/api_methods/federation/availability.py +++ b/gateway/sds_gateway/api_methods/federation/availability.py @@ -68,7 +68,7 @@ def is_federation_internal_header_valid(request) -> bool: return True header_name = _setting( "FEDERATION_EXPORT_INTERNAL_HEADER_NAME", - "X-SDS-Federation-Internal", + default="X-SDS-Federation-Internal", ) meta_key = "HTTP_" + header_name.upper().replace("-", "_") provided = request.META.get(meta_key, "") From 4bbcbed360f2aca9de6108ccd0b94b400b730ff0 Mon Sep 17 00:00:00 2001 From: klpoland Date: Thu, 25 Jun 2026 11:20:57 -0400 Subject: [PATCH 5/8] fix test errors --- gateway/config/settings/base.py | 2 +- .../users/management/commands/create_test_files.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/gateway/config/settings/base.py b/gateway/config/settings/base.py index 20ceef92b..71455b416 100644 --- a/gateway/config/settings/base.py +++ b/gateway/config/settings/base.py @@ -714,7 +714,7 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: FEDERATION_SITE_NAME: str = env.str( "FEDERATION_SITE_NAME", - default="", + default="crc", ) # Master switch: when False, federation export and Redis events are inactive. FEDERATION_ENABLED: bool = env.bool("FEDERATION_ENABLED", default=False) diff --git a/gateway/sds_gateway/users/management/commands/create_test_files.py b/gateway/sds_gateway/users/management/commands/create_test_files.py index 9c6f0b553..b85b28e0d 100644 --- a/gateway/sds_gateway/users/management/commands/create_test_files.py +++ b/gateway/sds_gateway/users/management/commands/create_test_files.py @@ -10,9 +10,9 @@ from django.db import transaction from django.utils import timezone -from sds_gateway.captures.models import Capture -from sds_gateway.files.models import File -from sds_gateway.minio_client import MinioClient +from sds_gateway.api_methods.models import Capture +from sds_gateway.api_methods.models import File +from sds_gateway.api_methods.utils.minio_client import get_minio_client User = get_user_model() # This is the only User import we need @@ -69,7 +69,7 @@ def _create_test_files(self, user): directory_path.mkdir(parents=True, exist_ok=True) # Create files - minio_client = MinioClient() + minio_client = get_minio_client() created_files = [] try: From 2c09faef79e62eeb1de9193ac4c099f987d2252c Mon Sep 17 00:00:00 2001 From: klpoland Date: Fri, 26 Jun 2026 11:56:33 -0400 Subject: [PATCH 6/8] clean up env variable settings, ip parsing, auth classes --- gateway/.envs/example/django.env | 22 ++++---- gateway/config/settings/base.py | 53 ++++++++---------- .../api_methods/federation/availability.py | 48 +++++++---------- .../api_methods/federation/events.py | 8 ++- .../api_methods/federation/export_contract.py | 3 +- .../api_methods/federation/permissions.py | 9 +--- .../api_methods/federation/signals.py | 6 +-- .../helpers/compile_federated_data.py | 4 +- .../tests/test_federation_export.py | 4 +- .../tests/test_federation_hardening.py | 54 +++++-------------- .../api_methods/views/dataset_endpoints.py | 2 +- 11 files changed, 82 insertions(+), 131 deletions(-) diff --git a/gateway/.envs/example/django.env b/gateway/.envs/example/django.env index 356c45345..f306a3f6b 100644 --- a/gateway/.envs/example/django.env +++ b/gateway/.envs/example/django.env @@ -10,18 +10,16 @@ API_KEY= # FEDERATION # ------------------------------------------------------------------------------ -# the below environment variables are used for site federation with peers. -# By default, federation events are disabled. -# If FEDERATION_ENABLED is TRUE, deployment will need to include -# federation sync service configuration. -# FEDERATION_ENABLED=true -# FEDERATION_EVENTS_ENABLED=true -# FEDERATION_EVENTS_CHANNEL=federation:events -# FEDERATION_SYNC_USER_EMAIL= -# FEDERATION_SITE_NAME= -# FEDERATION_SYNC_HEALTH_URL=http://federation-sync:8000/sync/health -# FEDERATION_EXPORT_INTERNAL_HEADER_SECRET= -# FEDERATION_EXPORT_TRUST_X_FORWARDED_FOR=false +# Peer sync uses the federation-sync Docker service (same sds-network as gateway; +# service definition lives under /federation). Bootstrap: enable federation, run +# create_federation_sync_api_key, pass the key to federation-sync. Set FEDERATION_SITE_NAME +# (e.g. crc) when enabling federation; use SDS_SITE_FQDN for the public host (RFC [site].fqdn). +# FEDERATION_ENABLED=true # Master switch for export APIs and Redis federation events. +# FEDERATION_SITE_NAME=crc # RFC [site].name (short peer id); set SDS_SITE_FQDN separately for [site].fqdn. +# FEDERATION_EVENTS_CHANNEL=federation:events # Redis pub/sub channel federation-sync subscribes to. +# FEDERATION_SYNC_HEALTH_URL=http://federation-sync:8000/sync/health # Health probe target (federation-sync service). +# FEDERATION_SYNC_USER_EMAIL=federation-sync@internal.local # Service user email for create_federation_sync_api_key. +# FEDERATION_EXPORT_ALLOWED_CIDRS= # Comma-separated CIDRs allowed to call export (default: private Docker ranges). # AUTH0 # ------------------------------------------------------------------------------ diff --git a/gateway/config/settings/base.py b/gateway/config/settings/base.py index 71455b416..e254b1f61 100644 --- a/gateway/config/settings/base.py +++ b/gateway/config/settings/base.py @@ -1,6 +1,7 @@ """Base settings to build other settings files upon.""" # ruff: noqa: ERA001 +import ipaddress import random import string from pathlib import Path @@ -23,6 +24,8 @@ def __get_random_token(length: int) -> str: __rng.choice(string.ascii_letters + string.digits) for _ in range(length) ) +def _parse_cidrs(raw: list[str]) -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]: + return [ipaddress.ip_network(item.strip(), strict=False) for item in raw] env.read_env() @@ -712,13 +715,10 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: SDS_PROGRAMMATIC_SITE_NAME: str = env.str("SDS_PROGRAMMATIC_SITE_NAME", default="sds") SDS_SITE_FQDN: str = env.str("SDS_SITE_FQDN", default="localhost") -FEDERATION_SITE_NAME: str = env.str( - "FEDERATION_SITE_NAME", - default="crc", -) +# Federation peer short name (RFC [site].name, e.g. crc, haystack); not SDS_PROGRAMMATIC_SITE_NAME. +FEDERATION_SITE_NAME: str = env.str("FEDERATION_SITE_NAME", default="").strip() # Master switch: when False, federation export and Redis events are inactive. FEDERATION_ENABLED: bool = env.bool("FEDERATION_ENABLED", default=False) -FEDERATION_EVENTS_ENABLED: bool = env.bool("FEDERATION_EVENTS_ENABLED", default=False) FEDERATION_EVENTS_CHANNEL: str = env.str( "FEDERATION_EVENTS_CHANNEL", default="federation:events", @@ -727,8 +727,10 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: "FEDERATION_SYNC_USER_EMAIL", default="federation-sync@internal.local", ) -# Sync service health (e.g. http://federation-sync:8000/sync/health). -FEDERATION_SYNC_HEALTH_URL: str = env.str("FEDERATION_SYNC_HEALTH_URL", default="") +FEDERATION_SYNC_HEALTH_URL: str = env.str( + "FEDERATION_SYNC_HEALTH_URL", + default="http://federation-sync:8000/sync/health", +) FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT: float = env.float( "FEDERATION_SYNC_HEALTH_PROBE_TIMEOUT", default=2.0, @@ -750,29 +752,20 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: FEDERATION_OPERATIONAL_REASON: str = "" # Tests may set via override_settings without running probes. FEDERATION_OPERATIONAL_OVERRIDE: bool | None = None -# Export API: allow only these source CIDRs (sync container / internal mesh). -FEDERATION_EXPORT_ALLOWED_CIDRS: list[str] = env.list( - "FEDERATION_EXPORT_ALLOWED_CIDRS", - default=[ - "127.0.0.1/32", - "::1/128", - "10.0.0.0/8", - "172.16.0.0/12", - "192.168.0.0/16", - ], -) -FEDERATION_EXPORT_TRUST_X_FORWARDED_FOR: bool = env.bool( - "FEDERATION_EXPORT_TRUST_X_FORWARDED_FOR", - default=False, -) -# Optional: Traefik injects this on internal routes only (empty = IP check only). -FEDERATION_EXPORT_INTERNAL_HEADER_NAME: str = env.str( - "FEDERATION_EXPORT_INTERNAL_HEADER_NAME", - default="X-SDS-Federation-Internal", -) -FEDERATION_EXPORT_INTERNAL_HEADER_SECRET: str = env.str( - "FEDERATION_EXPORT_INTERNAL_HEADER_SECRET", - default="", +# Export API: internal Docker/private networks (sync โ†’ django on sds-network). +FEDERATION_EXPORT_ALLOWED_CIDRS: list[ + ipaddress.IPv4Network | ipaddress.IPv6Network +] = _parse_cidrs( + env.list( + "FEDERATION_EXPORT_ALLOWED_CIDRS", + default=[ + "127.0.0.1/32", + "::1/128", + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", + ], + ), ) # ADMIN_CONSOLE_ENV is used to visually distinguish between different environments diff --git a/gateway/sds_gateway/api_methods/federation/availability.py b/gateway/sds_gateway/api_methods/federation/availability.py index 718bf10a1..f497c0758 100644 --- a/gateway/sds_gateway/api_methods/federation/availability.py +++ b/gateway/sds_gateway/api_methods/federation/availability.py @@ -4,7 +4,6 @@ import ipaddress import json -import secrets import time import urllib.error import urllib.request @@ -28,20 +27,20 @@ def _setting(name: str, *, default: Any = None) -> Any: return getattr(settings, name, default) -def _parse_cidrs(raw: list[str]) -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]: - return [ipaddress.ip_network(item.strip(), strict=False) for item in raw] +def _export_allowed_networks() -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]: + """Networks from settings (parsed at startup); tests may override with strings.""" + raw = _setting("FEDERATION_EXPORT_ALLOWED_CIDRS", default=[]) + networks: list[ipaddress.IPv4Network | ipaddress.IPv6Network] = [] + for item in raw: + if isinstance(item, (ipaddress.IPv4Network, ipaddress.IPv6Network)): + networks.append(item) + else: + networks.append(ipaddress.ip_network(str(item).strip(), strict=False)) + return networks def federation_client_ip(request) -> str | None: - """Resolve client IP for federation export access control.""" - trust_forwarded = _setting( - "FEDERATION_EXPORT_TRUST_X_FORWARDED_FOR", - default=False, - ) - if trust_forwarded: - forwarded = request.headers.get("x-forwarded-for", "") - if forwarded: - return forwarded.split(",")[0].strip() + """Client IP for export access control (direct internal connections only).""" remote = request.META.get("REMOTE_ADDR") if remote: return str(remote).strip() @@ -49,7 +48,7 @@ def federation_client_ip(request) -> str | None: def is_client_ip_allowed_for_federation_export(request) -> bool: - cidrs = _parse_cidrs(_setting("FEDERATION_EXPORT_ALLOWED_CIDRS", default=[])) + cidrs = _export_allowed_networks() if not cidrs: return False client_ip = federation_client_ip(request) @@ -62,21 +61,6 @@ def is_client_ip_allowed_for_federation_export(request) -> bool: return any(addr in network for network in cidrs) -def is_federation_internal_header_valid(request) -> bool: - secret = _setting("FEDERATION_EXPORT_INTERNAL_HEADER_SECRET", default="") - if not secret: - return True - header_name = _setting( - "FEDERATION_EXPORT_INTERNAL_HEADER_NAME", - default="X-SDS-Federation-Internal", - ) - meta_key = "HTTP_" + header_name.upper().replace("-", "_") - provided = request.META.get(meta_key, "") - if not provided: - return False - return secrets.compare_digest(str(provided), str(secret)) - - def _sync_health_ok() -> tuple[bool, str]: # noqa: PLR0911 if _setting("FEDERATION_SKIP_SYNC_HEALTH_PROBE", default=False): return True, "health probe skipped" @@ -119,8 +103,8 @@ def _sync_api_key_present() -> tuple[bool, str]: def _redis_ok() -> tuple[bool, str]: - if not _setting("FEDERATION_EVENTS_ENABLED", default=False): - return True, "redis not required (events disabled)" + if not _setting("FEDERATION_ENABLED", default=False): + return True, "redis not required (federation disabled)" if _setting("FEDERATION_SKIP_REDIS_PROBE", default=False): return True, "redis probe skipped" @@ -136,6 +120,10 @@ def evaluate_federation_operational() -> tuple[bool, str]: if not _setting("FEDERATION_ENABLED", default=False): return False, "FEDERATION_ENABLED is False" + site_name = (_setting("FEDERATION_SITE_NAME", default="") or "").strip() + if not site_name: + return False, "FEDERATION_SITE_NAME must be set when federation is enabled" + for check in (_sync_api_key_present, _sync_health_ok, _redis_ok): ok, reason = check() if not ok: diff --git a/gateway/sds_gateway/api_methods/federation/events.py b/gateway/sds_gateway/api_methods/federation/events.py index 82554387f..1fc815847 100644 --- a/gateway/sds_gateway/api_methods/federation/events.py +++ b/gateway/sds_gateway/api_methods/federation/events.py @@ -19,7 +19,13 @@ from sds_gateway.api_methods.models import ItemType -FederationEventType = str # created | updated | deleted +class FederationEventType(StrEnum): + CREATED = "created" + UPDATED = "updated" + DELETED = "deleted" + + def __str__(self) -> str: + return self.value def publish_federation_event( diff --git a/gateway/sds_gateway/api_methods/federation/export_contract.py b/gateway/sds_gateway/api_methods/federation/export_contract.py index 0da4b7f90..0a7d65df1 100644 --- a/gateway/sds_gateway/api_methods/federation/export_contract.py +++ b/gateway/sds_gateway/api_methods/federation/export_contract.py @@ -6,6 +6,7 @@ from typing import Any if TYPE_CHECKING: + from pydantic import BaseModel from rest_framework.serializers import BaseSerializer @@ -15,7 +16,7 @@ def serializer_output_field_names(serializer: BaseSerializer[Any]) -> set[str]: def assert_field_names_match( serializer: BaseSerializer[Any], - pydantic_model: type, + pydantic_model: BaseModel, *, label: str, ) -> None: diff --git a/gateway/sds_gateway/api_methods/federation/permissions.py b/gateway/sds_gateway/api_methods/federation/permissions.py index 61f2ecad8..2936c2e62 100644 --- a/gateway/sds_gateway/api_methods/federation/permissions.py +++ b/gateway/sds_gateway/api_methods/federation/permissions.py @@ -7,9 +7,6 @@ from sds_gateway.api_methods.federation.availability import ( is_client_ip_allowed_for_federation_export, ) -from sds_gateway.api_methods.federation.availability import ( - is_federation_internal_header_valid, -) from sds_gateway.api_methods.federation.availability import is_federation_operational @@ -34,11 +31,9 @@ def has_permission(self, request, view) -> bool: class IsFederationInternalExportClient(BasePermission): - """Restrict export to internal networks (and optional edge-injected header).""" + """Restrict export to internal network clients (sync service on sds-network).""" message = "Federation export is only available to internal clients." def has_permission(self, request, view) -> bool: - if not is_client_ip_allowed_for_federation_export(request): - return False - return is_federation_internal_header_valid(request) + return is_client_ip_allowed_for_federation_export(request) diff --git a/gateway/sds_gateway/api_methods/federation/signals.py b/gateway/sds_gateway/api_methods/federation/signals.py index fa30951a5..c5cc06e04 100644 --- a/gateway/sds_gateway/api_methods/federation/signals.py +++ b/gateway/sds_gateway/api_methods/federation/signals.py @@ -27,10 +27,8 @@ def _event_type(*, created: bool, exportable: bool) -> str: def _skip_signal() -> bool: - if not getattr(settings, "FEDERATION_EVENTS_ENABLED", False): - log.debug( - "FEDERATION_EVENTS_ENABLED is False, skipping federation signal", - ) + if not getattr(settings, "FEDERATION_ENABLED", False): + log.debug("FEDERATION_ENABLED is False, skipping federation signal") return True if not is_federation_operational(): diff --git a/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py b/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py index 365b2c201..53be29f53 100644 --- a/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py +++ b/gateway/sds_gateway/api_methods/helpers/compile_federated_data.py @@ -22,9 +22,7 @@ def federation_site_name() -> str: - return getattr( - settings, "FEDERATION_SITE_NAME", settings.SDS_PROGRAMMATIC_SITE_NAME - ) + return getattr(settings, "FEDERATION_SITE_NAME", "").strip() def public_datasets_queryset() -> QuerySet[Dataset]: diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_export.py b/gateway/sds_gateway/api_methods/tests/test_federation_export.py index 7d68dae5f..9dfe2a864 100644 --- a/gateway/sds_gateway/api_methods/tests/test_federation_export.py +++ b/gateway/sds_gateway/api_methods/tests/test_federation_export.py @@ -17,9 +17,9 @@ @override_settings( FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", FEDERATION_OPERATIONAL_OVERRIDE=True, FEDERATION_EXPORT_ALLOWED_CIDRS=["0.0.0.0/0", "::/0"], - FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="", ) class FederationExportAPITest(APITestCase): def setUp(self) -> None: @@ -84,7 +84,7 @@ def test_sync_key_can_retrieve_public_dataset(self) -> None: ) assert response.status_code == status.HTTP_200_OK assert response.json()["uuid"] == str(self.public_dataset.uuid) - assert response.json()["site_name"] + assert response.json()["site_name"] == "crc" def test_regular_key_denied_on_export(self) -> None: response = self.client.get( diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py b/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py index 7a20f8ce1..0fc2cfe0e 100644 --- a/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py +++ b/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py @@ -20,9 +20,6 @@ from sds_gateway.api_methods.federation.availability import ( is_client_ip_allowed_for_federation_export, ) -from sds_gateway.api_methods.federation.availability import ( - is_federation_internal_header_valid, -) from sds_gateway.api_methods.federation.availability import ( refresh_federation_operational_state, ) @@ -45,6 +42,7 @@ def test_disabled_when_master_switch_off(self) -> None: @override_settings( FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, FEDERATION_SKIP_REDIS_PROBE=True, @@ -55,6 +53,7 @@ def test_operational_when_probes_skipped(self) -> None: @override_settings( FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, FEDERATION_SKIP_REDIS_PROBE=True, ) @@ -65,6 +64,7 @@ def test_fails_without_sync_api_key(self) -> None: @override_settings( FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, FEDERATION_SYNC_HEALTH_URL="http://sync.test/health", FEDERATION_SKIP_REDIS_PROBE=True, @@ -92,25 +92,20 @@ def test_client_ip_allowlist(self) -> None: assert is_client_ip_allowed_for_federation_export(denied) is False @override_settings( - FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="top-secret", # noqa: S106 - FEDERATION_EXPORT_INTERNAL_HEADER_NAME="X-SDS-Federation-Internal", + FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="", + FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, + FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, + FEDERATION_SKIP_REDIS_PROBE=True, ) - def test_internal_header_when_configured(self) -> None: - factory = RequestFactory() - missing = factory.get("/") - valid = factory.get("/", HTTP_X_SDS_FEDERATION_INTERNAL="top-secret") - invalid = factory.get("/", HTTP_X_SDS_FEDERATION_INTERNAL="wrong") - assert is_federation_internal_header_valid(missing) is False - assert is_federation_internal_header_valid(valid) is True - assert is_federation_internal_header_valid(invalid) is False - - @override_settings(FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="") - def test_internal_header_optional_when_secret_empty(self) -> None: - request = RequestFactory().get("/") - assert is_federation_internal_header_valid(request) is True + def test_fails_without_site_name(self) -> None: + ok, reason = evaluate_federation_operational() + assert ok is False + assert "FEDERATION_SITE_NAME" in reason @override_settings( FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, FEDERATION_SKIP_SYNC_HEALTH_PROBE=True, FEDERATION_SKIP_REDIS_PROBE=True, @@ -125,9 +120,9 @@ def test_refresh_sets_settings_flags(self) -> None: @override_settings( FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", FEDERATION_OPERATIONAL_OVERRIDE=True, FEDERATION_EXPORT_ALLOWED_CIDRS=["0.0.0.0/0", "::/0"], - FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="", ) class FederationExportAccessControlTest(APITestCase): """Export API with operational + network permissions enabled for tests.""" @@ -186,24 +181,3 @@ def test_export_returns_503_when_not_operational(self) -> None: **self._auth(self.sync_key), ) assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE - - @override_settings( - FEDERATION_EXPORT_INTERNAL_HEADER_SECRET="edge-secret", # noqa: S106 - FEDERATION_OPERATIONAL_OVERRIDE=True, - FEDERATION_EXPORT_ALLOWED_CIDRS=["127.0.0.1/32"], - ) - def test_internal_header_required_when_configured(self) -> None: - denied = self.client.get( - self.list_datasets_url, - REMOTE_ADDR="127.0.0.1", - **self._auth(self.sync_key), - ) - assert denied.status_code == status.HTTP_403_FORBIDDEN - - allowed = self.client.get( - self.list_datasets_url, - REMOTE_ADDR="127.0.0.1", - HTTP_X_SDS_FEDERATION_INTERNAL="edge-secret", - **self._auth(self.sync_key), - ) - assert allowed.status_code == status.HTTP_200_OK diff --git a/gateway/sds_gateway/api_methods/views/dataset_endpoints.py b/gateway/sds_gateway/api_methods/views/dataset_endpoints.py index 6125fa0f0..2ef78566f 100644 --- a/gateway/sds_gateway/api_methods/views/dataset_endpoints.py +++ b/gateway/sds_gateway/api_methods/views/dataset_endpoints.py @@ -55,7 +55,7 @@ def _truthy_query_param(raw: str | None) -> bool: class DatasetViewSet(ViewSet): - authentication_classes = [APIKeyAuthentication] + authentication_classes = [APIKeyAuthentication, SessionAuthentication] def _get_file_objects( self, From 3393c0f72bdb2e2894b75630b165684fe87cd924 Mon Sep 17 00:00:00 2001 From: klpoland Date: Fri, 26 Jun 2026 12:10:11 -0400 Subject: [PATCH 7/8] gateway mirror for health check --- .../api_methods/federation/availability.py | 20 ++++++++++------ .../tests/test_federation_hardening.py | 23 +++++++++++++++++++ 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/gateway/sds_gateway/api_methods/federation/availability.py b/gateway/sds_gateway/api_methods/federation/availability.py index f497c0758..75cd6fd39 100644 --- a/gateway/sds_gateway/api_methods/federation/availability.py +++ b/gateway/sds_gateway/api_methods/federation/availability.py @@ -82,14 +82,20 @@ def _sync_health_ok() -> tuple[bool, str]: # noqa: PLR0911 return False, f"sync health probe failed: {exc.reason}" except TimeoutError: return False, "sync health probe timed out" - if body: - try: - payload = json.loads(body) - if isinstance(payload, dict) and payload.get("status") == "ok": - return True, "sync health ok" - except json.JSONDecodeError: - pass + if not body.strip(): return True, "sync health returned 200" + + try: + payload = json.loads(body) + except json.JSONDecodeError: + return True, "sync health returned 200" + + if isinstance(payload, dict): + if payload.get("status") == "ok": + return True, "sync health ok" + status_value = payload.get("status") + return False, f"sync health status is not ok: {status_value!r}" + return True, "sync health returned 200" diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py b/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py index 0fc2cfe0e..d52ef7b49 100644 --- a/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py +++ b/gateway/sds_gateway/api_methods/tests/test_federation_hardening.py @@ -81,6 +81,29 @@ def test_health_probe_success(self, mock_urlopen: MagicMock) -> None: ok, _reason = evaluate_federation_operational() assert ok is True + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_SITE_NAME="crc", + FEDERATION_SKIP_SYNC_API_KEY_CHECK=True, + FEDERATION_SYNC_HEALTH_URL="http://sync.test/health", + FEDERATION_SKIP_REDIS_PROBE=True, + ) + @patch("sds_gateway.api_methods.federation.availability.urllib.request.urlopen") + def test_health_probe_fails_when_json_status_not_ok( + self, + mock_urlopen: MagicMock, + ) -> None: + response = MagicMock() + response.status = 200 + response.read.return_value = b'{"status":"degraded"}' + response.__enter__.return_value = response + response.__exit__.return_value = None + mock_urlopen.return_value = response + + ok, reason = evaluate_federation_operational() + assert ok is False + assert "not ok" in reason + @override_settings( FEDERATION_EXPORT_ALLOWED_CIDRS=["10.0.0.0/8"], ) From 751d03aff6f8bb57481b7bdd5d593d1262bade2f Mon Sep 17 00:00:00 2001 From: klpoland Date: Thu, 2 Jul 2026 14:22:53 -0400 Subject: [PATCH 8/8] follow rfc 12: site-prefixed redis channels --- gateway/config/settings/base.py | 11 ++- .../api_methods/federation/events.py | 5 +- .../api_methods/federation/redis_channel.py | 22 +++++ .../tests/test_federation_events.py | 82 +++++++++++++++++++ 4 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 gateway/sds_gateway/api_methods/federation/redis_channel.py create mode 100644 gateway/sds_gateway/api_methods/tests/test_federation_events.py diff --git a/gateway/config/settings/base.py b/gateway/config/settings/base.py index e254b1f61..d176814f0 100644 --- a/gateway/config/settings/base.py +++ b/gateway/config/settings/base.py @@ -14,6 +14,9 @@ from config.settings.logs import ColoredFormatter from config.settings.utils import guess_admin_console_env from config.settings.utils import guess_max_web_download_size +from sds_gateway.api_methods.federation.redis_channel import ( + resolve_federation_events_channel, +) __rng = random.SystemRandom() @@ -719,9 +722,13 @@ def _strip_endpoint_scheme(endpoint_url: str) -> str: FEDERATION_SITE_NAME: str = env.str("FEDERATION_SITE_NAME", default="").strip() # Master switch: when False, federation export and Redis events are inactive. FEDERATION_ENABLED: bool = env.bool("FEDERATION_ENABLED", default=False) -FEDERATION_EVENTS_CHANNEL: str = env.str( +_federation_events_channel_override: str = env.str( "FEDERATION_EVENTS_CHANNEL", - default="federation:events", + default="", +).strip() +FEDERATION_EVENTS_CHANNEL: str = resolve_federation_events_channel( + site_name=FEDERATION_SITE_NAME, + channel_override=_federation_events_channel_override, ) FEDERATION_SYNC_USER_EMAIL: str = env.str( "FEDERATION_SYNC_USER_EMAIL", diff --git a/gateway/sds_gateway/api_methods/federation/events.py b/gateway/sds_gateway/api_methods/federation/events.py index 1fc815847..5f17e2033 100644 --- a/gateway/sds_gateway/api_methods/federation/events.py +++ b/gateway/sds_gateway/api_methods/federation/events.py @@ -5,6 +5,7 @@ import json from datetime import UTC from datetime import datetime +from enum import StrEnum from typing import TYPE_CHECKING from typing import Any @@ -39,9 +40,9 @@ def publish_federation_event( if not is_federation_operational(): log.debug("Federation not operational, skipping Redis publish") return - channel = getattr(settings, "FEDERATION_EVENTS_CHANNEL", "federation:events") + channel = settings.FEDERATION_EVENTS_CHANNEL payload: dict[str, Any] = { - "event_type": event_type, + "event_type": str(event_type), "item_type": item_type.value, "uuid": str(uuid), "timestamp": (timestamp or datetime.now(UTC)).isoformat(), diff --git a/gateway/sds_gateway/api_methods/federation/redis_channel.py b/gateway/sds_gateway/api_methods/federation/redis_channel.py new file mode 100644 index 000000000..48abf17c0 --- /dev/null +++ b/gateway/sds_gateway/api_methods/federation/redis_channel.py @@ -0,0 +1,22 @@ +"""Redis pub/sub channel naming for federation change events (RFC ยง8).""" + +from __future__ import annotations + + +def resolve_federation_events_channel( + *, + site_name: str = "", + channel_override: str = "", +) -> str: + """Return the Redis channel for local federation events. + + Override ``channel_override`` when set (``FEDERATION_EVENTS_CHANNEL`` env). + Otherwise use ``federation:events:{site_name}`` when ``site_name`` is set. + """ + override = (channel_override or "").strip() + if override: + return override + site = (site_name or "").strip() + if site: + return f"federation:events:{site}" + return "" diff --git a/gateway/sds_gateway/api_methods/tests/test_federation_events.py b/gateway/sds_gateway/api_methods/tests/test_federation_events.py new file mode 100644 index 000000000..62f458105 --- /dev/null +++ b/gateway/sds_gateway/api_methods/tests/test_federation_events.py @@ -0,0 +1,82 @@ +"""Tests for federation Redis event channel naming and publish.""" + +from __future__ import annotations + +from unittest.mock import MagicMock +from unittest.mock import patch +from uuid import uuid4 + +import pytest +from django.test import override_settings + +from sds_gateway.api_methods.federation.events import FederationEventType +from sds_gateway.api_methods.federation.events import publish_federation_event +from sds_gateway.api_methods.federation.redis_channel import ( + resolve_federation_events_channel, +) +from sds_gateway.api_methods.models import ItemType + +pytestmark = pytest.mark.django_db + + +class TestResolveFederationEventsChannel: + def test_site_prefixed_when_site_name_set(self) -> None: + assert ( + resolve_federation_events_channel(site_name="crc") + == "federation:events:crc" + ) + + def test_override_wins_over_site_name(self) -> None: + assert ( + resolve_federation_events_channel( + site_name="crc", + channel_override="federation:events:custom", + ) + == "federation:events:custom" + ) + + def test_empty_when_no_site_and_no_override(self) -> None: + assert resolve_federation_events_channel() == "" + + +class TestPublishFederationEvent: + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_OPERATIONAL_OVERRIDE=True, + FEDERATION_EVENTS_CHANNEL="federation:events:crc", + ) + @patch("sds_gateway.api_methods.federation.events.get_redis_client") + def test_publish_uses_configured_channel( + self, + mock_get_redis: MagicMock, + ) -> None: + mock_client = MagicMock() + mock_get_redis.return_value = mock_client + item_uuid = uuid4() + + publish_federation_event( + event_type=FederationEventType.CREATED, + item_type=ItemType.DATASET, + uuid=item_uuid, + ) + + mock_client.publish.assert_called_once() + channel, _payload = mock_client.publish.call_args[0] + assert channel == "federation:events:crc" + + @override_settings( + FEDERATION_ENABLED=True, + FEDERATION_OPERATIONAL_OVERRIDE=False, + FEDERATION_EVENTS_CHANNEL="federation:events:crc", + ) + @patch("sds_gateway.api_methods.federation.events.get_redis_client") + def test_skips_publish_when_not_operational( + self, + mock_get_redis: MagicMock, + ) -> None: + publish_federation_event( + event_type=FederationEventType.UPDATED, + item_type=ItemType.CAPTURE, + uuid=uuid4(), + ) + mock_get_redis.assert_not_called()