Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions elementary/monitor/api/tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
import statistics
from collections import defaultdict
from typing import DefaultDict, Dict, List, Optional, Union, cast
from typing import DefaultDict, Dict, List, Optional, Union

from dateutil import tz

Expand Down Expand Up @@ -445,6 +445,30 @@ def _parse_test_db_row(cls, test_db_row: TestDBRowSchema) -> TestSchema:
),
)

@staticmethod
def _normalize_results_sample(
sample_data: Optional[Union[dict, list]],
) -> Optional[list]:
"""Normalize sample_data to a list of row dicts.

The dbt macro get_test_results can return sample_data as either:
- a list of row dicts (expected shape, used by most dbt versions), or
- a dict wrapping the rows under a key such as "rows", "sample_rows",
"results_sample", or "data" (observed in some dbt-core 1.x / package
version combinations, causing results_sample to be silently empty in
the EDR report).

See: https://github.com/elementary-data/elementary/issues/2269
"""
if isinstance(sample_data, list):
return sample_data
if isinstance(sample_data, dict):
for key in ("rows", "sample_rows", "results_sample", "data"):
value = sample_data.get(key)
if isinstance(value, list):
return value
return None

@staticmethod
def _get_test_result_from_test_result_db_row(
test_result_db_row: TestResultDBRowSchema,
Expand All @@ -453,8 +477,9 @@ def _get_test_result_from_test_result_db_row(
test_results: Optional[Union[DbtTestResultSchema, ElementaryTestResultSchema]]

if test_result_db_row.test_type == "dbt_test":
# Sample data is always a list for non-elementary tests
sample_data = cast(Optional[list], test_result_db_row.sample_data)
sample_data = TestsAPI._normalize_results_sample(
test_result_db_row.sample_data
)
if disable_samples:
sample_data = None

Expand Down
62 changes: 62 additions & 0 deletions tests/mocks/fetchers/tests_fetcher_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,68 @@ def get_all_test_results_db_rows(self, *args, **kwargs):
days_diff=1,
invocations_rank_index=1,
),
# Row with list-shaped sample_data — should populate results_sample directly.
TestResultDBRowSchema(
id="mock_id_7",
model_unique_id="model_id_1",
test_unique_id="test_id_5",
elementary_unique_id="test_id_5.generic",
detected_at="2023-01-02 10:00:00",
database_name="test_db",
schema_name="test_schema",
table_name="table",
column_name="column",
test_type="dbt_test",
test_sub_type="generic",
test_results_description="Got 3 results, configured to fail if != 0",
original_path="tests/elementary/tests/test_elementary.py",
owners='["Jeff"]',
tags='["awesome-o"]',
meta="{}",
model_meta="{}",
test_results_query="select * from table",
other=None,
test_name="The test list sample",
test_params="{}",
severity="ERROR",
status="fail",
test_created_at="2023-01-01 09:00:00",
days_diff=1,
invocations_rank_index=1,
sample_data=[{"id": 1, "val": "a"}, {"id": 2, "val": "b"}, {"id": 3, "val": "c"}],
),
# Row with dict-shaped sample_data — regression case for issue #2269.
# Some dbt-core / elementary package version combinations return sample rows
# nested under a "rows" key rather than as a bare list.
TestResultDBRowSchema(
id="mock_id_8",
model_unique_id="model_id_1",
test_unique_id="test_id_6",
elementary_unique_id="test_id_6.generic",
detected_at="2023-01-02 10:00:00",
database_name="test_db",
schema_name="test_schema",
table_name="table",
column_name="column",
test_type="dbt_test",
test_sub_type="generic",
test_results_description="Got 2 results, configured to fail if != 0",
original_path="tests/elementary/tests/test_elementary.py",
owners='["Jeff"]',
tags='["awesome-o"]',
meta="{}",
model_meta="{}",
test_results_query="select * from table",
other=None,
test_name="The test dict sample",
test_params="{}",
severity="ERROR",
status="fail",
test_created_at="2023-01-01 09:00:00",
days_diff=1,
invocations_rank_index=1,
sample_data={"rows": [{"id": 10, "val": "x"}, {"id": 11, "val": "y"}]},
),
]
return [*ELEMENTARY_TEST_RESULT_DB_ROWS, *DBT_TEST_RESULT_DB_ROWS]

Expand Down
162 changes: 162 additions & 0 deletions tests/unit/monitor/api/tests/test_tests_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest

from elementary.monitor.fetchers.tests.schema import TestResultDBRowSchema
from tests.mocks.api.tests_api_mock import MockTestsAPI


Expand Down Expand Up @@ -67,3 +68,164 @@ def test_parse_test_db_row(
@pytest.fixture
def tests_api_mock() -> MockTestsAPI:
return MockTestsAPI()


# ---------------------------------------------------------------------------
# _normalize_results_sample
# ---------------------------------------------------------------------------


def test_normalize_results_sample_with_list():
"""List-shaped sample_data passes through unchanged."""
rows = [{"id": 1}, {"id": 2}]
result = MockTestsAPI._normalize_results_sample(rows)
assert result == rows


def test_normalize_results_sample_with_dict_rows_key():
"""Dict payload with 'rows' key is unwrapped correctly (issue #2269)."""
rows = [{"id": 10}, {"id": 11}]
result = MockTestsAPI._normalize_results_sample({"rows": rows})
assert result == rows


@pytest.mark.parametrize("key", ["sample_rows", "results_sample", "data"])
def test_normalize_results_sample_with_dict_other_keys(key):
"""Other common dict wrapper keys are also unwrapped."""
rows = [{"id": 1}]
result = MockTestsAPI._normalize_results_sample({key: rows})
assert result == rows


def test_normalize_results_sample_with_none():
assert MockTestsAPI._normalize_results_sample(None) is None


def test_normalize_results_sample_with_unknown_dict():
"""Dict with no recognized key returns None rather than crashing."""
result = MockTestsAPI._normalize_results_sample({"unknown_key": [{"id": 1}]})
assert result is None


# ---------------------------------------------------------------------------
# _get_test_result_from_test_result_db_row – results_sample population
# ---------------------------------------------------------------------------


def _make_dbt_row(**overrides) -> TestResultDBRowSchema:
defaults = dict(
id="r1",
model_unique_id="model_id_1",
test_unique_id="test_id_x",
elementary_unique_id="test_id_x.generic",
detected_at="2023-01-01 10:00:00",
database_name="db",
schema_name="schema",
table_name="table",
column_name="col",
test_type="dbt_test",
test_sub_type="generic",
test_results_description="Got 2 results, configured to fail if != 0",
test_description=None,
original_path="models/test.sql",
owners="[]",
model_owner="[]",
tags="[]",
test_tags="[]",
model_tags="[]",
meta="{}",
model_meta="{}",
test_results_query=None,
other=None,
test_name="my_test",
test_params="{}",
severity="ERROR",
status="fail",
test_created_at="2023-01-01 09:00:00",
days_diff=1,
invocations_rank_index=1,
sample_data=None,
failures=2,
package_name=None,
execution_time=None,
invocation_id=None,
test_execution_id=None,
)
defaults.update(overrides)
return TestResultDBRowSchema(**defaults)


def test_results_sample_populated_from_list_sample_data():
"""List-shaped sample_data → results_sample is populated (existing behaviour)."""
rows = [{"col": "bad_val_1"}, {"col": "bad_val_2"}]
row = _make_dbt_row(sample_data=rows)
result = MockTestsAPI._get_test_result_from_test_result_db_row(row)
assert result is not None
assert result.results_sample == rows


def test_results_sample_populated_from_dict_sample_data():
"""Dict-shaped sample_data → results_sample is still populated (fix for #2269)."""
rows = [{"col": "bad_val_1"}, {"col": "bad_val_2"}]
row = _make_dbt_row(sample_data={"rows": rows})
result = MockTestsAPI._get_test_result_from_test_result_db_row(row)
assert result is not None
assert result.results_sample == rows


def test_results_sample_empty_when_none():
row = _make_dbt_row(sample_data=None)
result = MockTestsAPI._get_test_result_from_test_result_db_row(row)
assert result is not None
assert result.results_sample is None


def test_results_sample_suppressed_when_disable_samples():
rows = [{"col": "bad_val"}]
row = _make_dbt_row(sample_data=rows)
result = MockTestsAPI._get_test_result_from_test_result_db_row(row, disable_samples=True)
assert result is not None
assert result.results_sample is None


def test_results_sample_suppressed_for_dict_sample_data_when_disable_samples():
"""disable_samples=True must suppress rows even when sample_data is dict-shaped."""
rows = [{"col": "bad_val"}]
row = _make_dbt_row(sample_data={"rows": rows})
result = MockTestsAPI._get_test_result_from_test_result_db_row(row, disable_samples=True)
assert result is not None
assert result.results_sample is None


# ---------------------------------------------------------------------------
# Integration: get_test_results includes sample rows from dict-shaped payload
# ---------------------------------------------------------------------------


def test_get_test_results_includes_dict_sample_rows(tests_api_mock: MockTestsAPI):
"""End-to-end: dict-shaped sample_data in the fetcher mock reaches the report payload."""
test_results = tests_api_mock.get_test_results(invocation_id=None)

# Flatten all result objects from all model keys
all_results = [r for results in test_results.values() for r in results]

# Find the two new mock rows added for issue #2269
list_sample_result = next(
(r for r in all_results if r.metadata.test_unique_id == "test_id_5"), None
)
dict_sample_result = next(
(r for r in all_results if r.metadata.test_unique_id == "test_id_6"), None
)

assert list_sample_result is not None, "mock row test_id_5 (list sample) not found"
assert dict_sample_result is not None, "mock row test_id_6 (dict sample) not found"

assert list_sample_result.test_results.results_sample == [
{"id": 1, "val": "a"},
{"id": 2, "val": "b"},
{"id": 3, "val": "c"},
]
assert dict_sample_result.test_results.results_sample == [
{"id": 10, "val": "x"},
{"id": 11, "val": "y"},
]
Loading