Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/executors/garf/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Executors to fetch data from various APIs."""
"""Fetches data from various APIs and writes them to local/remote locations."""

from __future__ import annotations

Expand All @@ -22,7 +22,7 @@
'ApiQueryExecutor',
]

__version__ = '1.5.0'
__version__ = '1.6.0'


def validate_version(version: str | None = None):
Expand Down
43 changes: 29 additions & 14 deletions libs/executors/garf/executors/entrypoints/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,17 @@ class ApiExecutorResponse(pydantic.BaseModel):

Attributes:
results: Results of query execution.
full_results: Mapping between query names and results of execution.
"""

results: list[Union[str, Any]]
full_results: dict[str, Any] | None = None

def model_post_init(self, __context):
if self.full_results:
for query, data in self.full_results.items():
if isinstance(data, garf.core.GarfReport):
self.full_results[query] = data.to_list(row_type='dict')


@app.exception_handler(garf.core.exceptions.GarfError)
Expand Down Expand Up @@ -120,7 +128,9 @@ def execute(request: tasks.ApiExecutorRequest) -> ApiExecutorResponse:
1, attributes={'executor.source': request.source}
)
result = tasks.execute(request.model_dump())
return ApiExecutorResponse(results=result)
return ApiExecutorResponse(
results=result, full_results={request.title: result}
)


@app.post('/api/execute:task', status_code=fastapi.status.HTTP_202_ACCEPTED)
Expand All @@ -145,7 +155,7 @@ def execute_batch(
n_queries, attributes={'executor.source': request.source}
)
results = tasks.execute_batch(request.model_dump())
return ApiExecutorResponse(results=results)
return ApiExecutorResponse(results=list(results.keys()), full_results=results)


@app.post(
Expand Down Expand Up @@ -174,7 +184,7 @@ def execute_workflow(
selected_aliases: Optional[list[str]] = None,
skipped_aliases: Optional[list[str]] = None,
simulate: bool = False,
) -> list[str]:
) -> dict[str, ApiExecutorResponse]:
"""Runs garf workflow till completion."""
telemetry.workflow_requested.add(1)
try:
Expand All @@ -184,12 +194,18 @@ def execute_workflow(
except workflow.GarfWorkflowError as e:
telemetry.workflow_error_counter.add(1)
raise fastapi.HTTPException(404, detail=str(e))
return tasks.execute_workflow(
workflow_results = tasks.execute_workflow(
execution_workflow=execution_workflow.model_dump(),
selected_aliases=selected_aliases,
skipped_aliases=skipped_aliases,
simulate=simulate,
)
return {
step: ApiExecutorResponse(
results=list(results.keys()), full_results=results
)
for step, results in workflow_results.items()
}


@app.post(
Expand Down Expand Up @@ -260,24 +276,23 @@ def _init_workflow(
config_data = yaml.safe_load(f)
except FileNotFoundError as e:
raise workflow.GarfWorkflowError('Incorrect config path') from e

else:
config_data = None
if workflow_file:
content = workflow_file.file.read()
workflow_data = yaml.safe_load(content.decode('utf-8'))
elif workflow_path:
return workflow.Workflow(
**workflow_data,
execution_config=config_data,
)
if workflow_path:
try:
with smart_open.open(workflow_path, 'r', encoding='utf-8') as f:
workflow_data = yaml.safe_load(f)
return workflow.Workflow.from_file(
path=workflow_path, config_file=config_data
)
except FileNotFoundError as e:
raise workflow.GarfWorkflowError('Incorrect workflow path') from e
else:
raise workflow.GarfWorkflowError('Neither workflow path nor file provided')
return workflow.Workflow(
**workflow_data,
execution_config=config_data,
)
raise workflow.GarfWorkflowError('Neither workflow path nor file provided')


@typer_app.command()
Expand Down
23 changes: 14 additions & 9 deletions libs/executors/garf/executors/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Defines common functionality between executors."""

from __future__ import annotations

import asyncio
import functools
import inspect
Expand Down Expand Up @@ -91,7 +93,7 @@ def execute(
no_writer_context = context.model_copy(update={'writer': 'unset'})
batch = {f'{title}_{i}': query for i, query in enumerate(query_parts)}
results = self.execute_batch(batch=batch, context=no_writer_context)
results = functools.reduce(operator.add, results)
results = functools.reduce(operator.add, list(results.values()))
else:
try:
results = self._execute(query=query_text, title=title, context=context)
Expand Down Expand Up @@ -138,7 +140,7 @@ def execute_batch(
batch: dict[str, str],
context: execution_context.ExecutionContext,
parallel_threshold: int = 10,
) -> list[str]:
) -> dict[str, str | report.GarfReport]:
"""Executes batch of queries for a common context.

If an executor has any pre/post processors, executes them first while
Expand All @@ -164,9 +166,10 @@ def execute_batch(
batch=batch, context=context, parallel_threshold=parallel_threshold
)
)
results = functools.reduce(lambda x, y: x | y, results)
else:
title, text = next(iter(batch.items()))
results = self.execute(query=text, title=title, context=context)
results = {title: self.execute(query=text, title=title, context=context)}
if self.postprocessors:
_handle_processors(processors=self.postprocessors, context=context)
return results
Expand Down Expand Up @@ -202,15 +205,17 @@ async def _run(
):
semaphore = asyncio.Semaphore(value=parallel_threshold)

async def run_with_semaphore(fn):
async def run_with_semaphore(title, fn):
async with semaphore:
return await fn
return {title: await fn}

tasks = [
self.aexecute(query=query, title=title, context=context)
tasks = {
title: self.aexecute(query=query, title=title, context=context)
for title, query in batch.items()
]
return await asyncio.gather(*(run_with_semaphore(task) for task in tasks))
}
return await asyncio.gather(
*(run_with_semaphore(title, task) for title, task in tasks.items())
)


@tracer.start_as_current_span('executor.handle_processors')
Expand Down
4 changes: 2 additions & 2 deletions libs/executors/garf/executors/workflows/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ def full_path(self) -> str:
if self.prefix:
if re.match(_REMOTE_FILES_PATTERN, str(self.prefix)):
return urllib.parse.urljoin(self.prefix, self.path)
return self.prefix / self.path
return pathlib.Path(self.prefix) / self.path
return self.path

@property
def text(self) -> str:
return reader_client.read(self.full_path)
return reader_client.read(self.full_path.absolute())

@property
def title(self) -> str:
Expand Down
9 changes: 5 additions & 4 deletions libs/executors/garf/executors/workflows/workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pathlib
import time

import garf.core
import yaml
from garf.executors import exceptions, setup, telemetry
from garf.executors.telemetry import tracer
Expand Down Expand Up @@ -71,7 +72,7 @@ def run(
selected_aliases: list[str] | None = None,
skipped_aliases: list[str] | None = None,
simulate: bool = False,
) -> list[str]:
) -> dict[str, dict[str, str | garf.core.GarfReport]]:
span = trace.get_current_span()
start_time = time.perf_counter()
workflow_attributes = {}
Expand All @@ -89,7 +90,7 @@ def run(
self.workflow.compile()
skipped_aliases = skipped_aliases or []
selected_aliases = selected_aliases or []
execution_results = []
execution_results = {}
logger.info('Starting Garf Workflow...')
for i, step in enumerate(self.workflow.steps, 1):
step_name = f'{i}-{step.fetcher}'
Expand Down Expand Up @@ -143,12 +144,12 @@ def run(
batch[query.title] = query.text
try:
step_start_time = time.perf_counter()
query_executor.execute_batch(
results = query_executor.execute_batch(
batch,
step.context,
step.parallel_threshold or self.parallel_threshold,
)
execution_results.append(step_name)
execution_results[step_name] = results
telemetry.workflow_step_counter.add(1, workflow_step_attributes)
step_duration = time.perf_counter() - step_start_time
telemetry.workflow_step_histogram.record(
Expand Down
101 changes: 87 additions & 14 deletions libs/executors/tests/end-to-end/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,56 @@

class TestApiQueryExecutor:
query = (
'SELECT resource, dimensions.name AS name, metrics.clicks AS clicks '
'FROM resource'
'SELECT dimension.string AS resource, dimensions.string AS name, metrics.int AS clicks '
'FROM fake'
)

def test_fake_source_from_query_text(self, tmp_path):
fake_data = _SCRIPT_PATH / 'test.json'
request = {
'source': 'fake',
'query': self.query,
'title': 'test',
'context': {
'fetcher_parameters': {
'data_location': str(fake_data),
'n_rows': 1,
},
'writer': 'csv',
'writer_parameters': {'destination_folder': str(tmp_path)},
},
}
response = client.post('/api/execute', json=request)
assert response.status_code == fastapi.status.HTTP_200_OK
expected_output = {'results': [f'[CSV] - at {tmp_path}/test.csv']}
tmp_path_output = f'{tmp_path}/test.csv'
query_output_path = f'[CSV] - at {tmp_path_output}'
expected_output = {
'results': [query_output_path],
'full_results': {'test': [query_output_path]},
}
assert response.json() == expected_output

def test_fake_source_from_query_path(self, tmp_path):
query_path = tmp_path / 'query.sql'
with pathlib.Path.open(query_path, 'w', encoding='utf-8') as f:
f.write(self.query)
fake_data = _SCRIPT_PATH / 'test.json'
request = {
'source': 'fake',
'query_path': str(query_path),
'context': {
'fetcher_parameters': {
'data_location': str(fake_data),
'n_rows': 1,
},
'writer': 'csv',
'writer_parameters': {'destination_folder': str(tmp_path)},
},
}
response = client.post('/api/execute', json=request)
assert response.status_code == fastapi.status.HTTP_200_OK
expected_output = {'results': [f'[CSV] - at {tmp_path}/query.csv']}
tmp_path_output = f'{tmp_path}/query.csv'
query_output_path = f'[CSV] - at {tmp_path_output}'
expected_output = {
'results': [query_output_path],
'full_results': {str(query_path): [query_output_path]},
}
assert response.json() == expected_output

def test_batch_fake_source_from_query_path(self, tmp_path):
Expand All @@ -78,7 +86,6 @@ def test_batch_fake_source_from_query_path(self, tmp_path):
query_path2 = tmp_path / 'query2.sql'
with pathlib.Path.open(query_path2, 'w', encoding='utf-8') as f:
f.write(self.query)
fake_data = _SCRIPT_PATH / 'test.json'
request = {
'source': 'fake',
'batch': {
Expand All @@ -87,16 +94,82 @@ def test_batch_fake_source_from_query_path(self, tmp_path):
},
'context': {
'fetcher_parameters': {
'data_location': str(fake_data),
'n_rows': 1,
},
'writer': 'csv',
'writer_parameters': {'destination_folder': str(tmp_path)},
},
}
response = client.post('/api/execute:batch', json=request)
assert response.status_code == fastapi.status.HTTP_200_OK
expected_output = {
f'[CSV] - at {tmp_path}/query1.csv',
f'[CSV] - at {tmp_path}/query2.csv',
expected_results = ['query1', 'query2']
expected_full_results = {
'query1': f'[CSV] - at {tmp_path}/query1.csv',
'query2': f'[CSV] - at {tmp_path}/query2.csv',
}
assert response.json().get('results') == expected_results
assert response.json().get('full_results') == expected_full_results

def test_batch_returns_report_results_with_no_writer(self, tmp_path):
query_path1 = tmp_path / 'query1.sql'
with pathlib.Path.open(query_path1, 'w', encoding='utf-8') as f:
f.write(self.query)
request = {
'source': 'fake',
'batch': {
'query1': self.query,
},
'context': {
'fetcher_parameters': {
'n_rows': 1,
},
},
}
response = client.post('/api/execute:batch', json=request)
assert response.status_code == fastapi.status.HTTP_200_OK
expected_results = ['query1']
assert response.json().get('results') == expected_results
expected_report_columns = ['resource', 'name', 'clicks']
for query, result in response.json().get('full_results').items():
assert list(result[0].keys()) == expected_report_columns

def test_workflow_from_file(self):
workflow_path = _SCRIPT_PATH / 'test_workflow.yaml'
request = {
'workflow_path': str(workflow_path),
}
response = client.post('/api/execute:workflow', params=request)
assert response.status_code == fastapi.status.HTTP_200_OK
expected_response = {
'1-fake-test': {
'results': ['query.sql', 'test_query'],
'full_results': {
'query.sql': '[Console] - query',
'test_query': '[Console] - test_query',
},
},
}
assert response.json() == expected_response

def test_workflow_from_file_no_writer(self):
workflow_path = _SCRIPT_PATH / 'test_workflow_no_writer.yaml'
request = {
'workflow_path': str(workflow_path),
}
response = client.post('/api/execute:workflow', params=request)
assert response.status_code == fastapi.status.HTTP_200_OK
expected_response = {
'1-fake-test': {
'results': ['test_query'],
'full_results': {
'test_query': [
{
'resource': 'Campaign A',
'name': 'Ad Group 1',
'clicks': 1500,
},
],
},
},
}
assert set(response.json().get('results')) == expected_output
assert response.json() == expected_response
Loading
Loading