Skip to content

Commit 1044f08

Browse files
thiagodeschampscursoragentTeddyCr
authored andcommitted
fix(dbt): pick latest run_results entry when duplicate unique_ids exist (#25996)
When multiple run_results files are present (e.g. split by domain or from hourly partial runs), the same unique_id can appear in more than one file. Previously, `add_dbt_tests` used `next()` which picked the first match — the order being non-deterministic (depends on S3 listing). This change introduces `_get_latest_result` which collects all matches and returns the one with the most recent `execute` completed_at timestamp. Falls back to the first match if timestamps are unavailable. Co-authored-by: Cursor <cursoragent@cursor.com> Co-authored-by: Teddy <teddy.crepineau@gmail.com>
1 parent a4f44d5 commit 1044f08

2 files changed

Lines changed: 143 additions & 9 deletions

File tree

ingestion/src/metadata/ingestion/source/database/dbt/metadata.py

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,45 @@ def yield_dbt_tags(
656656
)
657657
)
658658

659+
@staticmethod
660+
def _get_latest_result(dbt_objects: DbtObjects, key: str):
661+
"""
662+
When multiple run_results files are present (e.g. split by domain),
663+
the same unique_id may appear in more than one file. Return the
664+
result with the most recent ``execute`` completed_at timestamp so
665+
that OpenMetadata always reflects the latest test state.
666+
"""
667+
matches = [
668+
item
669+
for run_result in dbt_objects.dbt_run_results
670+
for item in run_result.results
671+
if item.unique_id == key
672+
]
673+
if not matches:
674+
return None
675+
if len(matches) == 1:
676+
return matches[0]
677+
678+
def _execute_completed_at(result):
679+
for timing in result.timing or []:
680+
if timing.name == "execute" and timing.completed_at:
681+
completed = timing.completed_at
682+
if isinstance(completed, str):
683+
try:
684+
return datetime.strptime(
685+
completed, DBT_RUN_RESULT_DATE_FORMAT
686+
)
687+
except ValueError:
688+
return None
689+
return completed
690+
return None
691+
692+
timestamped = [(r, _execute_completed_at(r)) for r in matches]
693+
with_ts = [(r, ts) for r, ts in timestamped if ts is not None]
694+
if with_ts:
695+
return max(with_ts, key=lambda pair: pair[1])[0]
696+
return matches[0]
697+
659698
def add_dbt_tests(
660699
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
661700
) -> None:
@@ -668,15 +707,9 @@ def add_dbt_tests(
668707
self.context.get().dbt_tests[key][
669708
DbtCommonEnum.UPSTREAM.value
670709
] = self.parse_upstream_nodes(manifest_entities, manifest_node)
671-
self.context.get().dbt_tests[key][DbtCommonEnum.RESULTS.value] = next(
672-
(
673-
item
674-
for run_result in dbt_objects.dbt_run_results
675-
for item in run_result.results
676-
if item.unique_id == key
677-
),
678-
None,
679-
)
710+
self.context.get().dbt_tests[key][
711+
DbtCommonEnum.RESULTS.value
712+
] = self._get_latest_result(dbt_objects, key)
680713

681714
def add_dbt_exposure(self, key: str, manifest_node, manifest_entities):
682715
exposure_entity = self.parse_exposure_node(manifest_node)

ingestion/tests/unit/test_dbt.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2585,3 +2585,104 @@ def test_download_dbt_files_with_all_artifacts(self):
25852585
self.assertIsNotNone(result[0].dbt_catalog)
25862586
self.assertIsNotNone(result[0].dbt_run_results)
25872587
self.assertIsNotNone(result[0].dbt_sources)
2588+
2589+
2590+
class TestGetLatestResult(TestCase):
2591+
"""
2592+
Test _get_latest_result picks the most recent result by execute
2593+
completed_at when the same unique_id appears in multiple run_results files.
2594+
"""
2595+
2596+
@staticmethod
2597+
def _make_result(unique_id, completed_at, status="pass"):
2598+
timing = MagicMock()
2599+
timing.name = "execute"
2600+
timing.completed_at = completed_at
2601+
result = MagicMock()
2602+
result.unique_id = unique_id
2603+
result.timing = [timing]
2604+
result.status = MagicMock(value=status)
2605+
return result
2606+
2607+
@staticmethod
2608+
def _make_dbt_objects(run_results_list):
2609+
run_results = []
2610+
for results in run_results_list:
2611+
rr = MagicMock()
2612+
rr.results = results
2613+
run_results.append(rr)
2614+
dbt_objects = MagicMock()
2615+
dbt_objects.dbt_run_results = run_results
2616+
return dbt_objects
2617+
2618+
def test_single_match_returned(self):
2619+
from metadata.ingestion.source.database.dbt.metadata import DbtSource
2620+
2621+
result_a = self._make_result("test.pkg.my_test", "2026-02-12T10:00:00.000000Z")
2622+
dbt_objects = self._make_dbt_objects([[result_a]])
2623+
2624+
got = DbtSource._get_latest_result(dbt_objects, "test.pkg.my_test")
2625+
self.assertIs(got, result_a)
2626+
2627+
def test_no_match_returns_none(self):
2628+
from metadata.ingestion.source.database.dbt.metadata import DbtSource
2629+
2630+
result_a = self._make_result("test.pkg.other", "2026-02-12T10:00:00.000000Z")
2631+
dbt_objects = self._make_dbt_objects([[result_a]])
2632+
2633+
got = DbtSource._get_latest_result(dbt_objects, "test.pkg.missing")
2634+
self.assertIsNone(got)
2635+
2636+
def test_picks_latest_across_files(self):
2637+
from metadata.ingestion.source.database.dbt.metadata import DbtSource
2638+
2639+
old_result = self._make_result(
2640+
"test.pkg.my_test", "2026-02-12T10:00:00.000000Z", "pass"
2641+
)
2642+
new_result = self._make_result(
2643+
"test.pkg.my_test", "2026-02-12T14:00:00.000000Z", "fail"
2644+
)
2645+
dbt_objects = self._make_dbt_objects([[old_result], [new_result]])
2646+
2647+
got = DbtSource._get_latest_result(dbt_objects, "test.pkg.my_test")
2648+
self.assertIs(got, new_result)
2649+
2650+
def test_picks_latest_regardless_of_order(self):
2651+
from metadata.ingestion.source.database.dbt.metadata import DbtSource
2652+
2653+
new_result = self._make_result(
2654+
"test.pkg.my_test", "2026-02-12T14:00:00.000000Z", "fail"
2655+
)
2656+
old_result = self._make_result(
2657+
"test.pkg.my_test", "2026-02-12T10:00:00.000000Z", "pass"
2658+
)
2659+
dbt_objects = self._make_dbt_objects([[new_result], [old_result]])
2660+
2661+
got = DbtSource._get_latest_result(dbt_objects, "test.pkg.my_test")
2662+
self.assertIs(got, new_result)
2663+
2664+
def test_falls_back_to_first_when_no_timestamps(self):
2665+
from metadata.ingestion.source.database.dbt.metadata import DbtSource
2666+
2667+
result_a = self._make_result("test.pkg.my_test", None, "pass")
2668+
result_b = self._make_result("test.pkg.my_test", None, "fail")
2669+
dbt_objects = self._make_dbt_objects([[result_a], [result_b]])
2670+
2671+
got = DbtSource._get_latest_result(dbt_objects, "test.pkg.my_test")
2672+
self.assertIs(got, result_a)
2673+
2674+
def test_datetime_objects_handled(self):
2675+
from datetime import datetime
2676+
2677+
from metadata.ingestion.source.database.dbt.metadata import DbtSource
2678+
2679+
old_result = self._make_result(
2680+
"test.pkg.my_test", datetime(2026, 2, 12, 10, 0, 0), "pass"
2681+
)
2682+
new_result = self._make_result(
2683+
"test.pkg.my_test", datetime(2026, 2, 12, 14, 0, 0), "fail"
2684+
)
2685+
dbt_objects = self._make_dbt_objects([[old_result], [new_result]])
2686+
2687+
got = DbtSource._get_latest_result(dbt_objects, "test.pkg.my_test")
2688+
self.assertIs(got, new_result)

0 commit comments

Comments
 (0)