Skip to content

Commit 538b75a

Browse files
ulixius9harshach
andcommitted
MINOR: Fix dbt OOM on listing files from GCS (#26105)
Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
1 parent 8db2e99 commit 538b75a

2 files changed

Lines changed: 305 additions & 18 deletions

File tree

ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -351,12 +351,15 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals
351351
raise DBTConfigException(f"Error fetching dbt files from dbt Cloud: {exc}")
352352

353353

354-
def get_blobs_grouped_by_dir(blobs: List[str]) -> Dict[str, List[str]]:
354+
def get_blobs_grouped_by_dir(blobs: Iterable[str]) -> Dict[str, List[str]]:
355355
"""
356356
Method to group the objs by the dir
357357
"""
358358
blob_grouped_by_directory = defaultdict(list)
359+
total_blobs_scanned = 0
360+
total_matched = 0
359361
for blob in blobs:
362+
total_blobs_scanned += 1
360363
subdirectory = os.path.dirname(blob)
361364
blob_file_name = os.path.basename(blob)
362365
# We'll be processing multiple run_result files from a single dir
@@ -368,6 +371,11 @@ def get_blobs_grouped_by_dir(blobs: List[str]) -> Dict[str, List[str]]:
368371
or DBT_SOURCES_FILE_NAME == blob_file_name.lower()
369372
):
370373
blob_grouped_by_directory[subdirectory].append(blob)
374+
total_matched += 1
375+
logger.debug(
376+
f"Scanned {total_blobs_scanned} blobs, found {total_matched} dbt artifacts "
377+
f"across {len(blob_grouped_by_directory)} directories"
378+
)
371379
return blob_grouped_by_directory
372380

373381

@@ -477,8 +485,11 @@ def _(config: DbtS3Config):
477485
if prefix:
478486
kwargs["Prefix"] = prefix if prefix.endswith("/") else f"{prefix}/"
479487

488+
logger.debug(f"Listing S3 objects in s3://{current_bucket}/{prefix or ''}")
480489
try:
481-
s3_objects = list(list_s3_objects(client, **kwargs))
490+
blob_grouped = get_blobs_grouped_by_dir(
491+
blobs=(obj["Key"] for obj in list_s3_objects(client, **kwargs))
492+
)
482493
except Exception as exc:
483494
error_msg = str(exc).lower()
484495
if "nosuchbucket" in error_msg:
@@ -495,10 +506,6 @@ def _(config: DbtS3Config):
495506
f"Failed to list objects in S3 bucket '{current_bucket}': {exc}"
496507
) from exc
497508

498-
blob_grouped = get_blobs_grouped_by_dir(
499-
blobs=[key["Key"] for key in s3_objects]
500-
)
501-
502509
if not blob_grouped:
503510
prefix_path = prefix or ""
504511
logger.warning(
@@ -574,12 +581,16 @@ def _(config: DbtGcsConfig):
574581

575582
for bucket in buckets:
576583
try:
577-
obj_list = list(
578-
client.list_blobs(bucket.name, prefix=prefix if prefix else None)
584+
logger.debug(
585+
f"Listing GCS objects in gs://{bucket.name}/{prefix or ''}"
579586
)
580-
581587
blob_grouped = get_blobs_grouped_by_dir(
582-
blobs=[blob.name for blob in obj_list]
588+
blobs=(
589+
blob.name
590+
for blob in client.list_blobs(
591+
bucket.name, prefix=prefix if prefix else None
592+
)
593+
)
583594
)
584595

585596
if not blob_grouped:
@@ -675,15 +686,14 @@ def _(config: DbtAzureConfig):
675686
for container_client in containers:
676687
container_name = container_client.container_name
677688
try:
678-
if prefix:
679-
blob_list = list(
680-
container_client.list_blobs(name_starts_with=prefix)
681-
)
682-
else:
683-
blob_list = list(container_client.list_blobs())
684-
689+
logger.debug(
690+
f"Listing Azure blobs in container '{container_name}/{prefix or ''}'"
691+
)
692+
blob_iter = container_client.list_blobs(
693+
name_starts_with=prefix if prefix else None
694+
)
685695
blob_grouped = get_blobs_grouped_by_dir(
686-
blobs=[blob.name for blob in blob_list]
696+
blobs=(blob.name for blob in blob_iter)
687697
)
688698

689699
if not blob_grouped:

ingestion/tests/unit/test_dbt.py

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2686,3 +2686,280 @@ def test_datetime_objects_handled(self):
26862686

26872687
got = DbtSource._get_latest_result(dbt_objects, "test.pkg.my_test")
26882688
self.assertIs(got, new_result)
2689+
2690+
2691+
class TestGetBlobsGroupedByDir(TestCase):
2692+
"""
2693+
Test cases for get_blobs_grouped_by_dir to verify streaming support,
2694+
correct filtering, and debug logging.
2695+
"""
2696+
2697+
def test_accepts_generator_input(self):
2698+
"""Test that get_blobs_grouped_by_dir works with a generator (lazy iterable)"""
2699+
from metadata.ingestion.source.database.dbt.dbt_config import (
2700+
get_blobs_grouped_by_dir,
2701+
)
2702+
2703+
def blob_generator():
2704+
yield "project1/manifest.json"
2705+
yield "project1/catalog.json"
2706+
yield "project2/manifest.json"
2707+
2708+
result = get_blobs_grouped_by_dir(blobs=blob_generator())
2709+
2710+
self.assertEqual(len(result), 2)
2711+
self.assertIn("project1", result)
2712+
self.assertIn("project2", result)
2713+
self.assertEqual(len(result["project1"]), 2)
2714+
self.assertEqual(len(result["project2"]), 1)
2715+
2716+
def test_filters_only_dbt_artifacts(self):
2717+
"""Test that non-dbt files are excluded from grouping"""
2718+
from metadata.ingestion.source.database.dbt.dbt_config import (
2719+
get_blobs_grouped_by_dir,
2720+
)
2721+
2722+
blobs = [
2723+
"project1/manifest.json",
2724+
"project1/catalog.json",
2725+
"project1/run_results.json",
2726+
"project1/sources.json",
2727+
"project1/some_other_file.csv",
2728+
"project1/data/large_dataset.parquet",
2729+
"project1/README.md",
2730+
]
2731+
2732+
result = get_blobs_grouped_by_dir(blobs=iter(blobs))
2733+
2734+
self.assertEqual(len(result), 1)
2735+
self.assertEqual(len(result["project1"]), 4)
2736+
2737+
def test_empty_input(self):
2738+
"""Test with empty iterable"""
2739+
from metadata.ingestion.source.database.dbt.dbt_config import (
2740+
get_blobs_grouped_by_dir,
2741+
)
2742+
2743+
result = get_blobs_grouped_by_dir(blobs=iter([]))
2744+
2745+
self.assertEqual(len(result), 0)
2746+
2747+
def test_groups_by_directory(self):
2748+
"""Test that blobs are correctly grouped by their parent directory"""
2749+
from metadata.ingestion.source.database.dbt.dbt_config import (
2750+
get_blobs_grouped_by_dir,
2751+
)
2752+
2753+
blobs = [
2754+
"team_a/dbt_project/manifest.json",
2755+
"team_a/dbt_project/catalog.json",
2756+
"team_b/analytics/manifest.json",
2757+
"team_b/analytics/run_results.json",
2758+
"team_c/models/manifest.json",
2759+
]
2760+
2761+
result = get_blobs_grouped_by_dir(blobs=iter(blobs))
2762+
2763+
self.assertEqual(len(result), 3)
2764+
self.assertEqual(len(result["team_a/dbt_project"]), 2)
2765+
self.assertEqual(len(result["team_b/analytics"]), 2)
2766+
self.assertEqual(len(result["team_c/models"]), 1)
2767+
2768+
def test_logs_scan_statistics(self):
2769+
"""Test that debug log includes correct scan statistics"""
2770+
from metadata.ingestion.source.database.dbt.dbt_config import (
2771+
get_blobs_grouped_by_dir,
2772+
)
2773+
2774+
blobs = [
2775+
"dir1/manifest.json",
2776+
"dir1/catalog.json",
2777+
"dir1/random_file.txt",
2778+
"dir2/manifest.json",
2779+
"dir2/another_file.csv",
2780+
]
2781+
2782+
with patch(
2783+
"metadata.ingestion.source.database.dbt.dbt_config.logger"
2784+
) as mock_logger:
2785+
get_blobs_grouped_by_dir(blobs=iter(blobs))
2786+
2787+
mock_logger.debug.assert_called_once()
2788+
log_message = mock_logger.debug.call_args[0][0]
2789+
self.assertIn("Scanned 5 blobs", log_message)
2790+
self.assertIn("found 3 dbt artifacts", log_message)
2791+
self.assertIn("2 directories", log_message)
2792+
2793+
def test_does_not_materialize_generator(self):
2794+
"""Test that the generator is consumed lazily, not converted to a list"""
2795+
from metadata.ingestion.source.database.dbt.dbt_config import (
2796+
get_blobs_grouped_by_dir,
2797+
)
2798+
2799+
consumed_items = []
2800+
2801+
def tracking_generator():
2802+
for item in [
2803+
"dir/manifest.json",
2804+
"dir/other.txt",
2805+
"dir/catalog.json",
2806+
]:
2807+
consumed_items.append(item)
2808+
yield item
2809+
2810+
get_blobs_grouped_by_dir(blobs=tracking_generator())
2811+
2812+
# All items should be consumed (the function fully iterates)
2813+
# but they should not all be held in memory simultaneously as a list
2814+
self.assertEqual(len(consumed_items), 3)
2815+
2816+
def test_multiple_run_results_matched(self):
2817+
"""Test that multiple run_results files (with different prefixes) are matched"""
2818+
from metadata.ingestion.source.database.dbt.dbt_config import (
2819+
get_blobs_grouped_by_dir,
2820+
)
2821+
2822+
blobs = [
2823+
"project/run_results.json",
2824+
"project/run_results_2024.json",
2825+
"project/run_results_latest.json",
2826+
]
2827+
2828+
result = get_blobs_grouped_by_dir(blobs=iter(blobs))
2829+
2830+
self.assertEqual(len(result["project"]), 3)
2831+
2832+
2833+
class TestStorageStreamingBehavior(TestCase):
2834+
"""
2835+
Test cases to verify that GCS, S3, and Azure handlers pass lazy
2836+
iterables to get_blobs_grouped_by_dir (not materialized lists).
2837+
"""
2838+
2839+
@patch("metadata.ingestion.source.database.dbt.dbt_config.download_dbt_files")
2840+
@patch("metadata.ingestion.source.database.dbt.dbt_config.get_blobs_grouped_by_dir")
2841+
@patch("metadata.ingestion.source.database.dbt.dbt_config.set_google_credentials")
2842+
def test_gcs_passes_generator_to_grouping(
2843+
self, mock_set_creds, mock_get_blobs, mock_download
2844+
):
2845+
"""Test that GCS handler passes a generator (not a list) to get_blobs_grouped_by_dir"""
2846+
from types import GeneratorType
2847+
2848+
from metadata.generated.schema.metadataIngestion.dbtconfig.dbtGCSConfig import (
2849+
DbtGcsConfig,
2850+
)
2851+
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
2852+
2853+
# Get the registered handler for DbtGcsConfig directly
2854+
gcs_handler = get_dbt_details.dispatch(DbtGcsConfig)
2855+
2856+
mock_blob_1 = MagicMock()
2857+
mock_blob_1.name = "project/manifest.json"
2858+
mock_blob_2 = MagicMock()
2859+
mock_blob_2.name = "project/catalog.json"
2860+
2861+
mock_bucket = MagicMock()
2862+
mock_bucket.name = "test-bucket"
2863+
2864+
mock_client = MagicMock()
2865+
mock_client.get_bucket.return_value = mock_bucket
2866+
mock_client.list_blobs.return_value = iter([mock_blob_1, mock_blob_2])
2867+
2868+
mock_get_blobs.return_value = {}
2869+
mock_download.return_value = iter([])
2870+
2871+
config = MagicMock()
2872+
config.dbtPrefixConfig.dbtBucketName = "test-bucket"
2873+
config.dbtPrefixConfig.dbtObjectPrefix = None
2874+
2875+
with patch("google.cloud.storage.Client", return_value=mock_client):
2876+
list(gcs_handler(config))
2877+
2878+
# Verify get_blobs_grouped_by_dir was called
2879+
mock_get_blobs.assert_called_once()
2880+
# Verify the blobs argument is a generator, not a list
2881+
blobs_arg = mock_get_blobs.call_args[1]["blobs"]
2882+
self.assertIsInstance(blobs_arg, GeneratorType)
2883+
2884+
@patch("metadata.ingestion.source.database.dbt.dbt_config.download_dbt_files")
2885+
@patch("metadata.ingestion.source.database.dbt.dbt_config.get_blobs_grouped_by_dir")
2886+
@patch("metadata.ingestion.source.database.dbt.dbt_config.list_s3_objects")
2887+
def test_s3_passes_generator_to_grouping(
2888+
self, mock_list_s3, mock_get_blobs, mock_download
2889+
):
2890+
"""Test that S3 handler passes a generator (not a list) to get_blobs_grouped_by_dir"""
2891+
from types import GeneratorType
2892+
2893+
from metadata.generated.schema.metadataIngestion.dbtconfig.dbtS3Config import (
2894+
DbtS3Config,
2895+
)
2896+
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
2897+
2898+
# Get the registered handler for DbtS3Config directly
2899+
s3_handler = get_dbt_details.dispatch(DbtS3Config)
2900+
2901+
mock_list_s3.return_value = iter(
2902+
[{"Key": "project/manifest.json"}, {"Key": "project/catalog.json"}]
2903+
)
2904+
2905+
mock_get_blobs.return_value = {}
2906+
mock_download.return_value = iter([])
2907+
2908+
config = MagicMock()
2909+
config.dbtPrefixConfig.dbtBucketName = "test-bucket"
2910+
config.dbtPrefixConfig.dbtObjectPrefix = None
2911+
2912+
mock_client = MagicMock()
2913+
2914+
with patch(
2915+
"metadata.ingestion.source.database.dbt.dbt_config.AWSClient"
2916+
) as mock_aws:
2917+
mock_aws.return_value.get_client.return_value = mock_client
2918+
list(s3_handler(config))
2919+
2920+
mock_get_blobs.assert_called_once()
2921+
blobs_arg = mock_get_blobs.call_args[1]["blobs"]
2922+
self.assertIsInstance(blobs_arg, GeneratorType)
2923+
2924+
@patch("metadata.ingestion.source.database.dbt.dbt_config.download_dbt_files")
2925+
@patch("metadata.ingestion.source.database.dbt.dbt_config.get_blobs_grouped_by_dir")
2926+
def test_azure_passes_generator_to_grouping(self, mock_get_blobs, mock_download):
2927+
"""Test that Azure handler passes a generator (not a list) to get_blobs_grouped_by_dir"""
2928+
from types import GeneratorType
2929+
2930+
from metadata.generated.schema.metadataIngestion.dbtconfig.dbtAzureConfig import (
2931+
DbtAzureConfig,
2932+
)
2933+
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
2934+
2935+
# Get the registered handler for DbtAzureConfig directly
2936+
azure_handler = get_dbt_details.dispatch(DbtAzureConfig)
2937+
2938+
mock_blob_1 = MagicMock()
2939+
mock_blob_1.name = "project/manifest.json"
2940+
mock_blob_2 = MagicMock()
2941+
mock_blob_2.name = "project/catalog.json"
2942+
2943+
mock_container = MagicMock()
2944+
mock_container.container_name = "test-container"
2945+
mock_container.list_blobs.return_value = iter([mock_blob_1, mock_blob_2])
2946+
2947+
mock_client = MagicMock()
2948+
mock_client.get_container_client.return_value = mock_container
2949+
2950+
mock_get_blobs.return_value = {}
2951+
mock_download.return_value = iter([])
2952+
2953+
config = MagicMock()
2954+
config.dbtPrefixConfig.dbtBucketName = "test-container"
2955+
config.dbtPrefixConfig.dbtObjectPrefix = None
2956+
2957+
with patch(
2958+
"metadata.ingestion.source.database.dbt.dbt_config.AzureClient"
2959+
) as mock_azure:
2960+
mock_azure.return_value.create_blob_client.return_value = mock_client
2961+
list(azure_handler(config))
2962+
2963+
mock_get_blobs.assert_called_once()
2964+
blobs_arg = mock_get_blobs.call_args[1]["blobs"]
2965+
self.assertIsInstance(blobs_arg, GeneratorType)

0 commit comments

Comments
 (0)