diff --git a/src/frequenz/client/common/microgrid/electrical_components/__init__.py b/src/frequenz/client/common/microgrid/electrical_components/__init__.py index 1189b92e..2c7ff71a 100644 --- a/src/frequenz/client/common/microgrid/electrical_components/__init__.py +++ b/src/frequenz/client/common/microgrid/electrical_components/__init__.py @@ -18,6 +18,7 @@ from ._crypto_miner import CryptoMiner from ._diagnostic_code import ElectricalComponentDiagnosticCode from ._electrical_component import ElectricalComponent +from ._electrical_component_connection import ElectricalComponentConnection from ._electrolyzer import Electrolyzer from ._ev_charger import ( AcEvCharger, @@ -75,6 +76,7 @@ "DcEvCharger", "ElectricalComponent", "ElectricalComponentCategory", + "ElectricalComponentConnection", "ElectricalComponentDiagnosticCode", "ElectricalComponentId", "ElectricalComponentStateCode", diff --git a/src/frequenz/client/common/microgrid/electrical_components/_electrical_component_connection.py b/src/frequenz/client/common/microgrid/electrical_components/_electrical_component_connection.py new file mode 100644 index 00000000..44b943d4 --- /dev/null +++ b/src/frequenz/client/common/microgrid/electrical_components/_electrical_component_connection.py @@ -0,0 +1,71 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Electrical component connection.""" + +import dataclasses +from datetime import datetime, timezone + +from ...types import Lifetime +from ._ids import ElectricalComponentId + + +@dataclasses.dataclass(frozen=True, kw_only=True) +class ElectricalComponentConnection: + """A single electrical link between two electrical components within a microgrid. + + An electrical component connection represents the physical wiring as viewed from the + grid connection point, if one exists, or from the islanding point, in case of an + islanded microgrids. + + Note: Physical Representation + This object is not about data flow but rather about the physical + electrical connections between electrical components. Therefore, the IDs for the + source and destination electrical components correspond to the actual setup within + the microgrid. + + Note: Direction + The direction of the connection follows the flow of current away from the + grid connection point, or in case of islands, away from the islanding + point. This direction is aligned with positive current according to the + [Passive Sign Convention] + (https://en.wikipedia.org/wiki/Passive_sign_convention). + + Note: Historical Data + The timestamps of when a connection was created and terminated allow for + tracking the changes over time to a microgrid, providing insights into + when and how the microgrid infrastructure has been modified. + """ + + source: ElectricalComponentId + """The unique identifier of the electrical component where the connection originates. + + This is aligned with the direction of current flow away from the grid connection + point, or in case of islands, away from the islanding point. + """ + + destination: ElectricalComponentId + """The unique ID of the electrical component where the connection terminates. + + This is the electrical component towards which the current flows. + """ + + operational_lifetime: Lifetime = dataclasses.field(default_factory=Lifetime) + """The operational lifetime of the connection.""" + + def __post_init__(self) -> None: + """Ensure that the source and destination electrical components are different.""" + if self.source == self.destination: + raise ValueError("Source and destination components must be different") + + def is_operational_at(self, timestamp: datetime) -> bool: + """Check whether this connection is operational at a specific timestamp.""" + return self.operational_lifetime.is_operational_at(timestamp) + + def is_operational_now(self) -> bool: + """Whether this connection is currently operational.""" + return self.is_operational_at(datetime.now(timezone.utc)) + + def __str__(self) -> str: + """Return a human-readable string representation of this instance.""" + return f"{self.source}->{self.destination}" diff --git a/src/frequenz/client/common/microgrid/electrical_components/proto/v1alpha8/__init__.py b/src/frequenz/client/common/microgrid/electrical_components/proto/v1alpha8/__init__.py index d0f0b603..b2a2f042 100644 --- a/src/frequenz/client/common/microgrid/electrical_components/proto/v1alpha8/__init__.py +++ b/src/frequenz/client/common/microgrid/electrical_components/proto/v1alpha8/__init__.py @@ -13,10 +13,16 @@ electrical_component_state_code_from_proto, electrical_component_state_code_to_proto, ) +from ._electrical_component_connection import ( + electrical_component_connection_from_proto, + electrical_component_connection_from_proto_with_issues, +) __all__ = [ "electrical_component_category_from_proto", "electrical_component_category_to_proto", + "electrical_component_connection_from_proto", + "electrical_component_connection_from_proto_with_issues", "electrical_component_diagnostic_code_from_proto", "electrical_component_diagnostic_code_to_proto", "electrical_component_from_proto", diff --git a/src/frequenz/client/common/microgrid/electrical_components/proto/v1alpha8/_electrical_component_connection.py b/src/frequenz/client/common/microgrid/electrical_components/proto/v1alpha8/_electrical_component_connection.py new file mode 100644 index 00000000..3c78edcb --- /dev/null +++ b/src/frequenz/client/common/microgrid/electrical_components/proto/v1alpha8/_electrical_component_connection.py @@ -0,0 +1,107 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Loading of ElectricalComponentConnection objects from protobuf messages.""" + +import logging + +from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( + electrical_components_pb2, +) + +from .....types import Lifetime +from .....types.proto.v1alpha8 import lifetime_from_proto +from ... import ElectricalComponentConnection, ElectricalComponentId + +_logger = logging.getLogger(__name__) + + +def electrical_component_connection_from_proto( + message: electrical_components_pb2.ElectricalComponentConnection, +) -> ElectricalComponentConnection | None: + """Create an `ElectricalComponentConnection` from a protobuf message.""" + major_issues: list[str] = [] + minor_issues: list[str] = [] + + connection = electrical_component_connection_from_proto_with_issues( + message, major_issues=major_issues, minor_issues=minor_issues + ) + + if major_issues: + _logger.warning( + "Found issues in electrical component connection: %s | Protobuf message:\n%s", + ", ".join(major_issues), + message, + ) + if minor_issues: + _logger.debug( + "Found minor issues in electrical component connection: %s | Protobuf message:\n%s", + ", ".join(minor_issues), + message, + ) + + return connection + + +def electrical_component_connection_from_proto_with_issues( + message: electrical_components_pb2.ElectricalComponentConnection, + *, + major_issues: list[str], + minor_issues: list[str], +) -> ElectricalComponentConnection | None: + """Create an `ElectricalComponentConnection` from a protobuf message collecting issues. + + This function is useful when you want to collect issues during the parsing + of multiple connections, rather than logging them immediately. + + Args: + message: The protobuf message to parse. + major_issues: A list to collect major issues found during parsing. + minor_issues: A list to collect minor issues found during parsing. + + Returns: + An `ElectricalComponentConnection` object created from the protobuf message, + or `None` if the protobuf message is completely invalid and an + `ElectricalComponentConnection` cannot be created. + """ + source_component_id = ElectricalComponentId(message.source_electrical_component_id) + destination_component_id = ElectricalComponentId( + message.destination_electrical_component_id + ) + if source_component_id == destination_component_id: + major_issues.append( + f"connection ignored: source and destination are the same ({source_component_id})", + ) + return None + + lifetime = _get_operational_lifetime_from_proto( + message, major_issues=major_issues, minor_issues=minor_issues + ) + + return ElectricalComponentConnection( + source=source_component_id, + destination=destination_component_id, + operational_lifetime=lifetime, + ) + + +def _get_operational_lifetime_from_proto( + message: electrical_components_pb2.ElectricalComponentConnection, + *, + major_issues: list[str], + minor_issues: list[str], +) -> Lifetime: + """Get the operational lifetime from a protobuf message.""" + if message.HasField("operational_lifetime"): + try: + return lifetime_from_proto(message.operational_lifetime) + except ValueError as exc: + major_issues.append( + f"invalid operational lifetime ({exc}), considering it as missing " + "(i.e. always operational)", + ) + else: + minor_issues.append( + "missing operational lifetime, considering it always operational", + ) + return Lifetime() diff --git a/tests/microgrid/electrical_components/proto/v1alpha8/test_electrical_component_connection.py b/tests/microgrid/electrical_components/proto/v1alpha8/test_electrical_component_connection.py new file mode 100644 index 00000000..218dc2bc --- /dev/null +++ b/tests/microgrid/electrical_components/proto/v1alpha8/test_electrical_component_connection.py @@ -0,0 +1,191 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Tests conversion from protobuf messages to ElectricalComponentConnection.""" + +import logging +from datetime import datetime, timezone +from typing import Any +from unittest.mock import Mock, patch + +import pytest +from frequenz.api.common.v1alpha8.microgrid import lifetime_pb2 +from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( + electrical_components_pb2, +) +from google.protobuf import timestamp_pb2 + +from frequenz.client.common.microgrid.electrical_components import ( + ElectricalComponentConnection, + ElectricalComponentId, +) +from frequenz.client.common.microgrid.electrical_components.proto.v1alpha8 import ( + electrical_component_connection_from_proto, + electrical_component_connection_from_proto_with_issues, +) + + +@pytest.mark.parametrize( + "proto_data, expected_minor_issues", + [ + pytest.param( + { + "source_electrical_component_id": 1, + "destination_electrical_component_id": 2, + "has_lifetime": True, + }, + [], + id="full", + ), + pytest.param( + { + "source_electrical_component_id": 1, + "destination_electrical_component_id": 2, + "has_lifetime": False, + }, + ["missing operational lifetime, considering it always operational"], + id="no_lifetime", + ), + ], +) +def test_success(proto_data: dict[str, Any], expected_minor_issues: list[str]) -> None: + """Test successful conversion to ElectricalComponentConnection.""" + proto = electrical_components_pb2.ElectricalComponentConnection( + source_electrical_component_id=proto_data["source_electrical_component_id"], + destination_electrical_component_id=proto_data[ + "destination_electrical_component_id" + ], + ) + + if proto_data["has_lifetime"]: + now = datetime.now(timezone.utc) + start_time = timestamp_pb2.Timestamp() + start_time.FromDatetime(now) + lifetime = lifetime_pb2.Lifetime() + lifetime.start_timestamp.CopyFrom(start_time) + proto.operational_lifetime.CopyFrom(lifetime) + + major_issues: list[str] = [] + minor_issues: list[str] = [] + connection = electrical_component_connection_from_proto_with_issues( + proto, + major_issues=major_issues, + minor_issues=minor_issues, + ) + + assert connection is not None + assert not major_issues + assert minor_issues == expected_minor_issues + assert connection.source == ElectricalComponentId( + proto_data["source_electrical_component_id"] + ) + assert connection.destination == ElectricalComponentId( + proto_data["destination_electrical_component_id"] + ) + + +def test_error_same_ids() -> None: + """Test proto conversion with the same source and destination returns None.""" + proto = electrical_components_pb2.ElectricalComponentConnection( + source_electrical_component_id=1, destination_electrical_component_id=1 + ) + + major_issues: list[str] = [] + minor_issues: list[str] = [] + conn = electrical_component_connection_from_proto_with_issues( + proto, + major_issues=major_issues, + minor_issues=minor_issues, + ) + + assert conn is None + assert major_issues == [ + "connection ignored: source and destination are the same (CID1)" + ] + assert not minor_issues + + +@patch( + "frequenz.client.common.microgrid.electrical_components.proto.v1alpha8." + "_electrical_component_connection.lifetime_from_proto", + autospec=True, +) +def test_invalid_lifetime(mock_lifetime_from_proto: Mock) -> None: + """Test proto conversion with invalid lifetime data.""" + mock_lifetime_from_proto.side_effect = ValueError("Invalid lifetime") + + proto = electrical_components_pb2.ElectricalComponentConnection( + source_electrical_component_id=1, destination_electrical_component_id=2 + ) + now = datetime.now(timezone.utc) + start_time = timestamp_pb2.Timestamp() + start_time.FromDatetime(now) + lifetime = lifetime_pb2.Lifetime() + lifetime.start_timestamp.CopyFrom(start_time) + proto.operational_lifetime.CopyFrom(lifetime) + + major_issues: list[str] = [] + minor_issues: list[str] = [] + connection = electrical_component_connection_from_proto_with_issues( + proto, major_issues=major_issues, minor_issues=minor_issues + ) + + assert connection is not None + assert connection.source == ElectricalComponentId(1) + assert connection.destination == ElectricalComponentId(2) + assert major_issues == [ + "invalid operational lifetime (Invalid lifetime), considering it as missing " + "(i.e. always operational)" + ] + assert not minor_issues + mock_lifetime_from_proto.assert_called_once_with(proto.operational_lifetime) + + +@patch( + "frequenz.client.common.microgrid.electrical_components.proto.v1alpha8." + "_electrical_component_connection." + "electrical_component_connection_from_proto_with_issues", + autospec=True, +) +def test_issues_logging( + mock_from_proto_with_issues: Mock, caplog: pytest.LogCaptureFixture +) -> None: + """Test collection and logging of issues during proto conversion.""" + caplog.set_level("DEBUG") # Ensure we capture DEBUG level messages + + # mypy needs the explicit return + def _fake_from_proto_with_issues( # pylint: disable=useless-return + _: electrical_components_pb2.ElectricalComponentConnection, + *, + major_issues: list[str], + minor_issues: list[str], + ) -> ElectricalComponentConnection | None: + """Fake function to simulate conversion and logging.""" + major_issues.append("fake major issue") + minor_issues.append("fake minor issue") + return None + + mock_from_proto_with_issues.side_effect = _fake_from_proto_with_issues + + mock_proto = Mock( + name="proto", spec=electrical_components_pb2.ElectricalComponentConnection + ) + connection = electrical_component_connection_from_proto(mock_proto) + + assert connection is None + assert caplog.record_tuples == [ + ( + "frequenz.client.common.microgrid.electrical_components.proto.v1alpha8." + "_electrical_component_connection", + logging.WARNING, + "Found issues in electrical component connection: fake major issue | " + f"Protobuf message:\n{mock_proto}", + ), + ( + "frequenz.client.common.microgrid.electrical_components.proto.v1alpha8." + "_electrical_component_connection", + logging.DEBUG, + "Found minor issues in electrical component connection: fake minor issue | " + f"Protobuf message:\n{mock_proto}", + ), + ] diff --git a/tests/microgrid/electrical_components/test_electrical_component_connection.py b/tests/microgrid/electrical_components/test_electrical_component_connection.py new file mode 100644 index 00000000..0f8c2977 --- /dev/null +++ b/tests/microgrid/electrical_components/test_electrical_component_connection.py @@ -0,0 +1,138 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Tests for ElectricalComponentConnection class and related functionality.""" + +from datetime import datetime, timedelta, timezone +from unittest.mock import Mock, patch + +import pytest + +from frequenz.client.common.microgrid.electrical_components import ( + ElectricalComponentConnection, + ElectricalComponentId, +) +from frequenz.client.common.types import Lifetime + + +def test_creation() -> None: + """Test basic ElectricalComponentConnection creation and validation.""" + now = datetime.now(timezone.utc) + lifetime = Lifetime(start=now) + connection = ElectricalComponentConnection( + source=ElectricalComponentId(1), + destination=ElectricalComponentId(2), + operational_lifetime=lifetime, + ) + + assert connection.source == ElectricalComponentId(1) + assert connection.destination == ElectricalComponentId(2) + assert connection.operational_lifetime == lifetime + + +def test_validation() -> None: + """Test validation of source and destination electrical components.""" + with pytest.raises( + ValueError, match="Source and destination components must be different" + ): + ElectricalComponentConnection( + source=ElectricalComponentId(1), destination=ElectricalComponentId(1) + ) + + +def test_str() -> None: + """Test string representation of ElectricalComponentConnection.""" + connection = ElectricalComponentConnection( + source=ElectricalComponentId(1), destination=ElectricalComponentId(2) + ) + assert str(connection) == "CID1->CID2" + + +def test_equality_and_hash() -> None: + """Test equality and hashing of the frozen ElectricalComponentConnection.""" + lifetime = Lifetime(start=datetime(2025, 1, 1, tzinfo=timezone.utc)) + connection = ElectricalComponentConnection( + source=ElectricalComponentId(1), + destination=ElectricalComponentId(2), + operational_lifetime=lifetime, + ) + same = ElectricalComponentConnection( + source=ElectricalComponentId(1), + destination=ElectricalComponentId(2), + operational_lifetime=lifetime, + ) + different = ElectricalComponentConnection( + source=ElectricalComponentId(1), + destination=ElectricalComponentId(3), + operational_lifetime=lifetime, + ) + + assert connection == same + assert connection != different + assert hash(connection) == hash(same) + assert {connection, same} == {connection} + + +def test_is_operational_at_boundaries() -> None: + """Test is_operational_at boundary semantics with a concrete lifetime.""" + start = datetime(2025, 1, 1, tzinfo=timezone.utc) + end = datetime(2025, 12, 31, tzinfo=timezone.utc) + connection = ElectricalComponentConnection( + source=ElectricalComponentId(1), + destination=ElectricalComponentId(2), + operational_lifetime=Lifetime(start=start, end=end), + ) + + before = start - timedelta(seconds=1) + middle = datetime(2025, 6, 1, tzinfo=timezone.utc) + after = end + timedelta(seconds=1) + + assert connection.is_operational_at(before) is False + assert connection.is_operational_at(start) is True + assert connection.is_operational_at(middle) is True + assert connection.is_operational_at(end) is True + assert connection.is_operational_at(after) is False + + +@pytest.mark.parametrize( + "lifetime_active", [True, False], ids=["operational", "not-operational"] +) +def test_is_operational_at(lifetime_active: bool) -> None: + """Test active_at behavior with lifetime.active values.""" + mock_lifetime = Mock(spec=Lifetime) + mock_lifetime.is_operational_at.return_value = lifetime_active + + connection = ElectricalComponentConnection( + source=ElectricalComponentId(1), + destination=ElectricalComponentId(2), + operational_lifetime=mock_lifetime, + ) + + now = datetime.now(timezone.utc) + assert connection.is_operational_at(now) == lifetime_active + mock_lifetime.is_operational_at.assert_called_once_with(now) + + +@patch( + "frequenz.client.common.microgrid.electrical_components." + "_electrical_component_connection.datetime" +) +@pytest.mark.parametrize( + "lifetime_active", [True, False], ids=["operational", "not-operational"] +) +def test_is_operational_now(mock_datetime: Mock, lifetime_active: bool) -> None: + """Test if the connection is operational at the current time.""" + now = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + mock_datetime.now.side_effect = lambda tz: now.replace(tzinfo=tz) + mock_lifetime = Mock(spec=Lifetime) + mock_lifetime.is_operational_at.return_value = lifetime_active + + connection = ElectricalComponentConnection( + source=ElectricalComponentId(1), + destination=ElectricalComponentId(2), + operational_lifetime=mock_lifetime, + ) + + assert connection.is_operational_now() is lifetime_active + mock_lifetime.is_operational_at.assert_called_once_with(now) + mock_datetime.now.assert_called_once_with(timezone.utc)