-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathdurable_entity.py
More file actions
97 lines (78 loc) · 3.35 KB
/
durable_entity.py
File metadata and controls
97 lines (78 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from typing import Any, Optional, Type, TypeVar, Union, overload
from durabletask.entities.entity_context import EntityContext
from durabletask.entities.entity_instance_id import EntityInstanceId
TState = TypeVar("TState")
TInput = TypeVar("TInput")
class DurableEntity:
def _initialize_entity_context(self, context: EntityContext):
self.entity_context = context
@overload
def get_state(self, intended_type: Type[TState], default: TState) -> TState:
...
@overload
def get_state(self, intended_type: Type[TState]) -> Optional[TState]:
...
@overload
def get_state(self, intended_type: None = None, default: Any = None) -> Any:
...
def get_state(self, intended_type: Optional[Type[TState]] = None, default: Optional[TState] = None) -> Union[None, TState, Any]:
"""Get the current state of the entity, optionally converting it to a specified type.
Parameters
----------
intended_type : Type[TState] | None, optional
The type to which the state should be converted. If None, the state is returned as-is.
default : TState, optional
The default value to return if the state is not found or cannot be converted.
Returns
-------
TState | Any
The current state of the entity, optionally converted to the specified type.
"""
return self.entity_context.get_state(intended_type, default)
def set_state(self, state: Any):
"""Set the state of the entity to a new value.
Parameters
----------
new_state : Any
The new state to set for the entity.
"""
self.entity_context.set_state(state)
def signal_entity(self,
entity_instance_id: EntityInstanceId[TInput, Any],
operation: str,
input: Optional[TInput] = None) -> None:
"""Signal another entity to perform an operation.
Parameters
----------
entity_instance_id : EntityInstanceId
The ID of the entity instance to signal.
operation : str
The operation to perform on the entity.
input : Any, optional
The input to provide to the entity for the operation.
"""
self.entity_context.signal_entity(entity_instance_id, operation, input)
def schedule_new_orchestration(self, orchestration_name: str, input: Optional[Any] = None, instance_id: Optional[str] = None) -> str:
"""Schedule a new orchestration instance.
Parameters
----------
orchestration_name : str
The name of the orchestration to schedule.
input : Any, optional
The input to provide to the new orchestration.
instance_id : str, optional
The instance ID to assign to the new orchestration. If None, a new ID will be generated.
Returns
-------
str
The instance ID of the scheduled orchestration.
"""
return self.entity_context.schedule_new_orchestration(orchestration_name, input, instance_id=instance_id)
def delete(self, input: Any = None) -> None:
"""Delete the entity instance.
Parameters
----------
input : Any, optional
Unused: The input for the entity "delete" operation.
"""
self.set_state(None)