From 22dace861036f9c45625a98d14ce0aa5e2b65aa0 Mon Sep 17 00:00:00 2001 From: Andrei Markin Date: Mon, 22 Jun 2026 00:26:52 +0400 Subject: [PATCH] feat(executors): change structure of return in execute_batch and workflows * Executor.execute_batch returns mapping between query title and it's results (either reports themselves or their written locations) * Workflow.run returns dict with each step and all the execution results of each query in the step * ApiExecutorResponse in server.py now has `full_results` property that contains results of the execution of a batch / query. * Add server tests for workflow * Bug fix for reading relative files in workflows. --- libs/executors/garf/executors/__init__.py | 4 +- .../garf/executors/entrypoints/server.py | 43 +++++--- libs/executors/garf/executors/executor.py | 23 ++-- .../garf/executors/workflows/workflow.py | 4 +- .../executors/workflows/workflow_runner.py | 9 +- .../executors/tests/end-to-end/test_server.py | 101 +++++++++++++++--- .../end-to-end/test_workflow_no_writer.yaml | 29 +++++ .../unit/workflows/test_workflow_runner.py | 2 +- 8 files changed, 169 insertions(+), 46 deletions(-) create mode 100644 libs/executors/tests/end-to-end/test_workflow_no_writer.yaml diff --git a/libs/executors/garf/executors/__init__.py b/libs/executors/garf/executors/__init__.py index 9ca247be..2158e170 100644 --- a/libs/executors/garf/executors/__init__.py +++ b/libs/executors/garf/executors/__init__.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Executors to fetch data from various APIs.""" +"""Fetches data from various APIs and writes them to local/remote locations.""" from __future__ import annotations @@ -22,7 +22,7 @@ 'ApiQueryExecutor', ] -__version__ = '1.5.0' +__version__ = '1.6.0' def validate_version(version: str | None = None): diff --git a/libs/executors/garf/executors/entrypoints/server.py b/libs/executors/garf/executors/entrypoints/server.py index 8290cb87..4246dcc3 100644 --- a/libs/executors/garf/executors/entrypoints/server.py +++ b/libs/executors/garf/executors/entrypoints/server.py @@ -70,9 +70,17 @@ class ApiExecutorResponse(pydantic.BaseModel): Attributes: results: Results of query execution. + full_results: Mapping between query names and results of execution. """ results: list[Union[str, Any]] + full_results: dict[str, Any] | None = None + + def model_post_init(self, __context): + if self.full_results: + for query, data in self.full_results.items(): + if isinstance(data, garf.core.GarfReport): + self.full_results[query] = data.to_list(row_type='dict') @app.exception_handler(garf.core.exceptions.GarfError) @@ -120,7 +128,9 @@ def execute(request: tasks.ApiExecutorRequest) -> ApiExecutorResponse: 1, attributes={'executor.source': request.source} ) result = tasks.execute(request.model_dump()) - return ApiExecutorResponse(results=result) + return ApiExecutorResponse( + results=result, full_results={request.title: result} + ) @app.post('/api/execute:task', status_code=fastapi.status.HTTP_202_ACCEPTED) @@ -145,7 +155,7 @@ def execute_batch( n_queries, attributes={'executor.source': request.source} ) results = tasks.execute_batch(request.model_dump()) - return ApiExecutorResponse(results=results) + return ApiExecutorResponse(results=list(results.keys()), full_results=results) @app.post( @@ -174,7 +184,7 @@ def execute_workflow( selected_aliases: Optional[list[str]] = None, skipped_aliases: Optional[list[str]] = None, simulate: bool = False, -) -> list[str]: +) -> dict[str, ApiExecutorResponse]: """Runs garf workflow till completion.""" telemetry.workflow_requested.add(1) try: @@ -184,12 +194,18 @@ def execute_workflow( except workflow.GarfWorkflowError as e: telemetry.workflow_error_counter.add(1) raise fastapi.HTTPException(404, detail=str(e)) - return tasks.execute_workflow( + workflow_results = tasks.execute_workflow( execution_workflow=execution_workflow.model_dump(), selected_aliases=selected_aliases, skipped_aliases=skipped_aliases, simulate=simulate, ) + return { + step: ApiExecutorResponse( + results=list(results.keys()), full_results=results + ) + for step, results in workflow_results.items() + } @app.post( @@ -260,24 +276,23 @@ def _init_workflow( config_data = yaml.safe_load(f) except FileNotFoundError as e: raise workflow.GarfWorkflowError('Incorrect config path') from e - else: config_data = None if workflow_file: content = workflow_file.file.read() workflow_data = yaml.safe_load(content.decode('utf-8')) - elif workflow_path: + return workflow.Workflow( + **workflow_data, + execution_config=config_data, + ) + if workflow_path: try: - with smart_open.open(workflow_path, 'r', encoding='utf-8') as f: - workflow_data = yaml.safe_load(f) + return workflow.Workflow.from_file( + path=workflow_path, config_file=config_data + ) except FileNotFoundError as e: raise workflow.GarfWorkflowError('Incorrect workflow path') from e - else: - raise workflow.GarfWorkflowError('Neither workflow path nor file provided') - return workflow.Workflow( - **workflow_data, - execution_config=config_data, - ) + raise workflow.GarfWorkflowError('Neither workflow path nor file provided') @typer_app.command() diff --git a/libs/executors/garf/executors/executor.py b/libs/executors/garf/executors/executor.py index 973a9882..0e18f117 100644 --- a/libs/executors/garf/executors/executor.py +++ b/libs/executors/garf/executors/executor.py @@ -14,6 +14,8 @@ """Defines common functionality between executors.""" +from __future__ import annotations + import asyncio import functools import inspect @@ -91,7 +93,7 @@ def execute( no_writer_context = context.model_copy(update={'writer': 'unset'}) batch = {f'{title}_{i}': query for i, query in enumerate(query_parts)} results = self.execute_batch(batch=batch, context=no_writer_context) - results = functools.reduce(operator.add, results) + results = functools.reduce(operator.add, list(results.values())) else: try: results = self._execute(query=query_text, title=title, context=context) @@ -138,7 +140,7 @@ def execute_batch( batch: dict[str, str], context: execution_context.ExecutionContext, parallel_threshold: int = 10, - ) -> list[str]: + ) -> dict[str, str | report.GarfReport]: """Executes batch of queries for a common context. If an executor has any pre/post processors, executes them first while @@ -164,9 +166,10 @@ def execute_batch( batch=batch, context=context, parallel_threshold=parallel_threshold ) ) + results = functools.reduce(lambda x, y: x | y, results) else: title, text = next(iter(batch.items())) - results = self.execute(query=text, title=title, context=context) + results = {title: self.execute(query=text, title=title, context=context)} if self.postprocessors: _handle_processors(processors=self.postprocessors, context=context) return results @@ -202,15 +205,17 @@ async def _run( ): semaphore = asyncio.Semaphore(value=parallel_threshold) - async def run_with_semaphore(fn): + async def run_with_semaphore(title, fn): async with semaphore: - return await fn + return {title: await fn} - tasks = [ - self.aexecute(query=query, title=title, context=context) + tasks = { + title: self.aexecute(query=query, title=title, context=context) for title, query in batch.items() - ] - return await asyncio.gather(*(run_with_semaphore(task) for task in tasks)) + } + return await asyncio.gather( + *(run_with_semaphore(title, task) for title, task in tasks.items()) + ) @tracer.start_as_current_span('executor.handle_processors') diff --git a/libs/executors/garf/executors/workflows/workflow.py b/libs/executors/garf/executors/workflows/workflow.py index 9fb10b73..54d808d9 100644 --- a/libs/executors/garf/executors/workflows/workflow.py +++ b/libs/executors/garf/executors/workflows/workflow.py @@ -94,12 +94,12 @@ def full_path(self) -> str: if self.prefix: if re.match(_REMOTE_FILES_PATTERN, str(self.prefix)): return urllib.parse.urljoin(self.prefix, self.path) - return self.prefix / self.path + return pathlib.Path(self.prefix) / self.path return self.path @property def text(self) -> str: - return reader_client.read(self.full_path) + return reader_client.read(self.full_path.absolute()) @property def title(self) -> str: diff --git a/libs/executors/garf/executors/workflows/workflow_runner.py b/libs/executors/garf/executors/workflows/workflow_runner.py index 8cc87790..bba6ec8c 100644 --- a/libs/executors/garf/executors/workflows/workflow_runner.py +++ b/libs/executors/garf/executors/workflows/workflow_runner.py @@ -19,6 +19,7 @@ import pathlib import time +import garf.core import yaml from garf.executors import exceptions, setup, telemetry from garf.executors.telemetry import tracer @@ -71,7 +72,7 @@ def run( selected_aliases: list[str] | None = None, skipped_aliases: list[str] | None = None, simulate: bool = False, - ) -> list[str]: + ) -> dict[str, dict[str, str | garf.core.GarfReport]]: span = trace.get_current_span() start_time = time.perf_counter() workflow_attributes = {} @@ -89,7 +90,7 @@ def run( self.workflow.compile() skipped_aliases = skipped_aliases or [] selected_aliases = selected_aliases or [] - execution_results = [] + execution_results = {} logger.info('Starting Garf Workflow...') for i, step in enumerate(self.workflow.steps, 1): step_name = f'{i}-{step.fetcher}' @@ -143,12 +144,12 @@ def run( batch[query.title] = query.text try: step_start_time = time.perf_counter() - query_executor.execute_batch( + results = query_executor.execute_batch( batch, step.context, step.parallel_threshold or self.parallel_threshold, ) - execution_results.append(step_name) + execution_results[step_name] = results telemetry.workflow_step_counter.add(1, workflow_step_attributes) step_duration = time.perf_counter() - step_start_time telemetry.workflow_step_histogram.record( diff --git a/libs/executors/tests/end-to-end/test_server.py b/libs/executors/tests/end-to-end/test_server.py index 81c24952..95c82203 100644 --- a/libs/executors/tests/end-to-end/test_server.py +++ b/libs/executors/tests/end-to-end/test_server.py @@ -27,19 +27,18 @@ class TestApiQueryExecutor: query = ( - 'SELECT resource, dimensions.name AS name, metrics.clicks AS clicks ' - 'FROM resource' + 'SELECT dimension.string AS resource, dimensions.string AS name, metrics.int AS clicks ' + 'FROM fake' ) def test_fake_source_from_query_text(self, tmp_path): - fake_data = _SCRIPT_PATH / 'test.json' request = { 'source': 'fake', 'query': self.query, 'title': 'test', 'context': { 'fetcher_parameters': { - 'data_location': str(fake_data), + 'n_rows': 1, }, 'writer': 'csv', 'writer_parameters': {'destination_folder': str(tmp_path)}, @@ -47,20 +46,24 @@ def test_fake_source_from_query_text(self, tmp_path): } response = client.post('/api/execute', json=request) assert response.status_code == fastapi.status.HTTP_200_OK - expected_output = {'results': [f'[CSV] - at {tmp_path}/test.csv']} + tmp_path_output = f'{tmp_path}/test.csv' + query_output_path = f'[CSV] - at {tmp_path_output}' + expected_output = { + 'results': [query_output_path], + 'full_results': {'test': [query_output_path]}, + } assert response.json() == expected_output def test_fake_source_from_query_path(self, tmp_path): query_path = tmp_path / 'query.sql' with pathlib.Path.open(query_path, 'w', encoding='utf-8') as f: f.write(self.query) - fake_data = _SCRIPT_PATH / 'test.json' request = { 'source': 'fake', 'query_path': str(query_path), 'context': { 'fetcher_parameters': { - 'data_location': str(fake_data), + 'n_rows': 1, }, 'writer': 'csv', 'writer_parameters': {'destination_folder': str(tmp_path)}, @@ -68,7 +71,12 @@ def test_fake_source_from_query_path(self, tmp_path): } response = client.post('/api/execute', json=request) assert response.status_code == fastapi.status.HTTP_200_OK - expected_output = {'results': [f'[CSV] - at {tmp_path}/query.csv']} + tmp_path_output = f'{tmp_path}/query.csv' + query_output_path = f'[CSV] - at {tmp_path_output}' + expected_output = { + 'results': [query_output_path], + 'full_results': {str(query_path): [query_output_path]}, + } assert response.json() == expected_output def test_batch_fake_source_from_query_path(self, tmp_path): @@ -78,7 +86,6 @@ def test_batch_fake_source_from_query_path(self, tmp_path): query_path2 = tmp_path / 'query2.sql' with pathlib.Path.open(query_path2, 'w', encoding='utf-8') as f: f.write(self.query) - fake_data = _SCRIPT_PATH / 'test.json' request = { 'source': 'fake', 'batch': { @@ -87,7 +94,7 @@ def test_batch_fake_source_from_query_path(self, tmp_path): }, 'context': { 'fetcher_parameters': { - 'data_location': str(fake_data), + 'n_rows': 1, }, 'writer': 'csv', 'writer_parameters': {'destination_folder': str(tmp_path)}, @@ -95,8 +102,74 @@ def test_batch_fake_source_from_query_path(self, tmp_path): } response = client.post('/api/execute:batch', json=request) assert response.status_code == fastapi.status.HTTP_200_OK - expected_output = { - f'[CSV] - at {tmp_path}/query1.csv', - f'[CSV] - at {tmp_path}/query2.csv', + expected_results = ['query1', 'query2'] + expected_full_results = { + 'query1': f'[CSV] - at {tmp_path}/query1.csv', + 'query2': f'[CSV] - at {tmp_path}/query2.csv', + } + assert response.json().get('results') == expected_results + assert response.json().get('full_results') == expected_full_results + + def test_batch_returns_report_results_with_no_writer(self, tmp_path): + query_path1 = tmp_path / 'query1.sql' + with pathlib.Path.open(query_path1, 'w', encoding='utf-8') as f: + f.write(self.query) + request = { + 'source': 'fake', + 'batch': { + 'query1': self.query, + }, + 'context': { + 'fetcher_parameters': { + 'n_rows': 1, + }, + }, + } + response = client.post('/api/execute:batch', json=request) + assert response.status_code == fastapi.status.HTTP_200_OK + expected_results = ['query1'] + assert response.json().get('results') == expected_results + expected_report_columns = ['resource', 'name', 'clicks'] + for query, result in response.json().get('full_results').items(): + assert list(result[0].keys()) == expected_report_columns + + def test_workflow_from_file(self): + workflow_path = _SCRIPT_PATH / 'test_workflow.yaml' + request = { + 'workflow_path': str(workflow_path), + } + response = client.post('/api/execute:workflow', params=request) + assert response.status_code == fastapi.status.HTTP_200_OK + expected_response = { + '1-fake-test': { + 'results': ['query.sql', 'test_query'], + 'full_results': { + 'query.sql': '[Console] - query', + 'test_query': '[Console] - test_query', + }, + }, + } + assert response.json() == expected_response + + def test_workflow_from_file_no_writer(self): + workflow_path = _SCRIPT_PATH / 'test_workflow_no_writer.yaml' + request = { + 'workflow_path': str(workflow_path), + } + response = client.post('/api/execute:workflow', params=request) + assert response.status_code == fastapi.status.HTTP_200_OK + expected_response = { + '1-fake-test': { + 'results': ['test_query'], + 'full_results': { + 'test_query': [ + { + 'resource': 'Campaign A', + 'name': 'Ad Group 1', + 'clicks': 1500, + }, + ], + }, + }, } - assert set(response.json().get('results')) == expected_output + assert response.json() == expected_response diff --git a/libs/executors/tests/end-to-end/test_workflow_no_writer.yaml b/libs/executors/tests/end-to-end/test_workflow_no_writer.yaml new file mode 100644 index 00000000..29d730d5 --- /dev/null +++ b/libs/executors/tests/end-to-end/test_workflow_no_writer.yaml @@ -0,0 +1,29 @@ +name: test_workflow_no_writer +metadata: + description: Test Workflow + version: 0.0.0 + required_garf_version: 0.0.0 + required_fetchers: + google-ads: 0.0.0 + +steps: + - alias: test + fetcher: fake + queries: + - query: + title: test_query + text: | + SELECT + resource, + dimensions.name AS name, + metrics.clicks AS clicks + FROM resource + fetcher_parameters: + data: + - resource: Campaign A + dimensions: + name: Ad Group 1 + id: 101 + metrics: + clicks: 1500 + cost: 250.75 diff --git a/libs/executors/tests/unit/workflows/test_workflow_runner.py b/libs/executors/tests/unit/workflows/test_workflow_runner.py index 8ae6ae74..47c7f584 100644 --- a/libs/executors/tests/unit/workflows/test_workflow_runner.py +++ b/libs/executors/tests/unit/workflows/test_workflow_runner.py @@ -26,7 +26,7 @@ def test_run_returns_executed_step_names(self): workflow = Workflow.from_file(_TEST_WORKFLOW_PATH) runner = workflow_runner.WorkflowRunner(execution_workflow=workflow) results = runner.run() - assert results == ['1-fake-test'] + assert '1-fake-test' in results def test_compile_saves_file(self, tmp_path): tmp_workflow_path = tmp_path / 'workflow.yaml'