-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathentity_metadata.py
More file actions
98 lines (81 loc) · 3.73 KB
/
entity_metadata.py
File metadata and controls
98 lines (81 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from datetime import datetime, timezone
from typing import Any, Optional, Type, TypeVar, Union, overload
from durabletask.entities.entity_instance_id import EntityInstanceId
import durabletask.internal.orchestrator_service_pb2 as pb
TState = TypeVar("TState")
class EntityMetadata:
"""Class representing the metadata of a durable entity.
This class encapsulates the metadata information of a durable entity, allowing for
easy access and manipulation of the entity's metadata within the Durable Task
Framework.
Attributes:
id (EntityInstanceId): The unique identifier of the entity instance.
last_modified (datetime): The timestamp of the last modification to the entity.
backlog_queue_size (int): The size of the backlog queue for the entity.
locked_by (str): The identifier of the worker that currently holds the lock on the entity.
includes_state (bool): Indicates whether the metadata includes the state of the entity.
state (Optional[Any]): The current state of the entity, if included.
"""
def __init__(self,
id: EntityInstanceId[Any, Any],
last_modified: datetime,
backlog_queue_size: int,
locked_by: str,
includes_state: bool,
state: Optional[Any]):
"""Initializes a new instance of the EntityMetadata class.
Args:
value: The initial state value of the entity.
"""
self.id = id
self.last_modified = last_modified
self.backlog_queue_size = backlog_queue_size
self._locked_by = locked_by
self.includes_state = includes_state
self._state = state
@staticmethod
def from_entity_response(entity_response: pb.GetEntityResponse, includes_state: bool):
try:
entity_id = EntityInstanceId.parse(entity_response.entity.instanceId)
except ValueError:
raise ValueError("Invalid entity instance ID in entity response.")
entity_state = None
if includes_state:
entity_state = entity_response.entity.serializedState.value
return EntityMetadata(
id=entity_id,
last_modified=entity_response.entity.lastModifiedTime.ToDatetime(timezone.utc),
backlog_queue_size=entity_response.entity.backlogQueueSize,
locked_by=entity_response.entity.lockedBy.value,
includes_state=includes_state,
state=entity_state
)
@overload
def get_state(self, intended_type: Type[TState]) -> Optional[TState]:
...
@overload
def get_state(self, intended_type: None = None) -> Any:
...
def get_state(self, intended_type: Optional[Type[TState]] = None) -> Union[None, TState, Any]:
"""Get the current state of the entity, optionally converting it to a specified type."""
if intended_type is None or self._state is None:
return self._state
if isinstance(self._state, intended_type):
return self._state
try:
return intended_type(self._state) # type: ignore[call-arg]
except Exception as ex:
raise TypeError(
f"Could not convert state of type '{type(self._state).__name__}' to '{intended_type.__name__}'"
) from ex
def get_locked_by(self) -> Optional[EntityInstanceId]:
"""Get the identifier of the worker that currently holds the lock on the entity.
Returns
-------
str
The identifier of the worker that currently holds the lock on the entity.
"""
if not self._locked_by:
return None
# Will throw ValueError if the format is invalid
return EntityInstanceId.parse(self._locked_by)