-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathclient.py
More file actions
85 lines (70 loc) · 3.48 KB
/
client.py
File metadata and controls
85 lines (70 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import json
from datetime import timedelta
from typing import Any, Optional
import azure.functions as func
from durabletask.entities import EntityInstanceId
from durabletask.client import TaskHubGrpcClient
from durabletask.azurefunctions.internal.azurefunctions_grpc_interceptor import AzureFunctionsDefaultClientInterceptorImpl
# Client class used for Durable Functions
class DurableFunctionsClient(TaskHubGrpcClient):
taskHubName: str
connectionName: str
creationUrls: dict[str, str]
managementUrls: dict[str, str]
baseUrl: str
requiredQueryStringParameters: str
rpcBaseUrl: str
httpBaseUrl: str
maxGrpcMessageSizeInBytes: int
grpcHttpClientTimeout: timedelta
def __init__(self, client_as_string: str):
client = json.loads(client_as_string)
self.taskHubName = client.get("taskHubName", "")
self.connectionName = client.get("connectionName", "")
self.creationUrls = client.get("creationUrls", {})
self.managementUrls = client.get("managementUrls", {})
self.baseUrl = client.get("baseUrl", "")
self.requiredQueryStringParameters = client.get("requiredQueryStringParameters", "")
self.rpcBaseUrl = client.get("rpcBaseUrl", "")
self.httpBaseUrl = client.get("httpBaseUrl", "")
self.maxGrpcMessageSizeInBytes = client.get("maxGrpcMessageSizeInBytes", 0)
# TODO: convert the string value back to timedelta - annoying regex?
self.grpcHttpClientTimeout = client.get("grpcHttpClientTimeout", timedelta(seconds=30))
interceptors = [AzureFunctionsDefaultClientInterceptorImpl(self.taskHubName, self.requiredQueryStringParameters)]
# We pass in None for the metadata so we don't construct an additional interceptor in the parent class
# Since the parent class doesn't use anything metadata for anything else, we can set it as None
super().__init__(
host_address=self.rpcBaseUrl,
secure_channel=False,
metadata=None,
interceptors=interceptors)
def create_check_status_response(self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse:
"""Creates an HTTP response for checking the status of a Durable Function instance.
Args:
request (func.HttpRequest): The incoming HTTP request.
instance_id (str): The ID of the Durable Function instance.
"""
raise NotImplementedError("This method is not implemented yet.")
def create_http_management_payload(self, instance_id: str) -> dict[str, str]:
"""Creates an HTTP management payload for a Durable Function instance.
Args:
instance_id (str): The ID of the Durable Function instance.
"""
raise NotImplementedError("This method is not implemented yet.")
def read_entity_state(
self,
entity_id: EntityInstanceId,
task_hub_name: Optional[str],
connection_name: Optional[str]
) -> tuple[bool, Any]:
"""Reads the state of a Durable Entity.
Args:
entity_id (str): The ID of the Durable Entity.
task_hub_name (Optional[str]): The name of the task hub.
connection_name (Optional[str]): The name of the connection.
Returns:
(bool, Any): A tuple containing a boolean indicating if the entity exists and its state.
"""
raise NotImplementedError("This method is not implemented yet.")