Skip to content

Commit 292891b

Browse files
Fix #24886: CSV OOM Issue (#24905)
* fix: OOM issue when processing large CSV files by allowing a maximum of 5 chunks * Revert "fix: OOM issue when processing large CSV files by allowing a maximum of 5 chunks" This reverts commit aa8dda1. * fix: use iterator to avoid OOM issues for large files * fix: parquet and datalake tests * fix: optimized the file reading code for all the support file types * fix: s3fs version and json file reading * test: added and fixed file reading tests * fix: generate close on read first chunk * refact: remove iterator instance check fromt tests * refactor(datalake): Convert DataFrame API from list-based to generator-based Refactors the pandas profiler and datalake readers to use lazy generator-based DataFrame iteration instead of loading entire datasets into memory. This improves memory efficiency for large files and enables streaming processing. ## Core API Changes ### DataFrame Readers (dataframes now returns callable) - base.py: Updated read_first_chunk() to handle callable dataframes - parquet.py: All storage backends (S3, GCS, Azure, Local) return callables - dsv.py: CSV/TSV readers return callables - json.py: JSON/JSONL readers return callables - avro.py: All storage backends return callables (fixed S3 reader) - mf4.py: Fixed empty case to return callable ### Profiler Interface - profiler_interface.py: Added _type_casted_dataset() wrapper that applies type casting lazily; dataset is now a callable generator factory - runner.py: Updated PandasRunner to work with callable dataset ### Metrics (updated to iterate over dataset) - All static metrics (count, min, max, sum, mean, stddev, etc.) - All window metrics (median, first_quartile, third_quartile) - Hybrid metrics (histogram, cardinality_distribution) ### Data Quality Validators - pandas_validator_mixin.py: Updated to iterate over dataframes - base_test_handler.py: Handle callable dataframes - Table validators: tableColumnCount*, tableColumnNameToExist, tableColumnToMatchSet, tableCustomSQLQuery ### Utilities - datalake_utils.py: fetch_dataframe() and fetch_dataframe_first_chunk() now call the dataframes callable before iterating - pandas_mixin.py: Updated partitioning logic - sampler.py: Updated DatalakeSampler for generator-based dataset ## Test Updates - test_profiler_interface.py, test_profiler.py, test_sample.py, test_custom_metrics.py, test_datalake_metrics.py: Mock get_dataframes() - test_*_reader.py: Use list(result.dataframes()) pattern - test_parquet_azure_reader.py: Consume generator before mock assertions * style: ran python linting * refactor(datalake): addressed issues after initial review * refactor: add logging when error * fix: sdk validator * style: ran java style check * fix(observability): failing DQ tests * fix: tests related to callable dataframes * fix: failing tests * fix: _stream_json_lines - use file_obj.readline() * fix: pymssql version * fix: ijson version * fix: gitar typehinting comment * fix: removed fetch_dataframe method * fix: setup.py libraries to fix timeout --------- Co-authored-by: TeddyCr <teddy.crepineau@gmail.com>
1 parent 1711b6d commit 292891b

82 files changed

Lines changed: 2830 additions & 827 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

ingestion/setup.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,17 @@
2222
VERSIONS = {
2323
"airflow": "apache-airflow==3.1.5",
2424
"adlfs": "adlfs>=2023.1.0",
25+
"aiobotocore": "aiobotocore~=2.26.0",
2526
"avro": "avro>=1.11.4,<1.12",
26-
"boto3": "boto3>=1.20,<2.0", # No need to add botocore separately. It's a dep from boto3
27+
"boto3": "boto3~=1.41.5",
2728
"geoalchemy2": "GeoAlchemy2~=0.12",
2829
"google-cloud-monitoring": "google-cloud-monitoring>=2.0.0",
2930
"google-cloud-storage": "google-cloud-storage>=1.43.0",
30-
"gcsfs": "gcsfs>=2023.1.0",
31+
"gcsfs": "gcsfs~=2023.12.1",
3132
"great-expectations": "great-expectations~=0.18.0",
3233
"great-expectations-1xx": "great-expectations~=1.0",
3334
"grpc-tools": "grpcio-tools>=1.47.2",
35+
"ijson": "ijson~=3.4",
3436
"msal": "msal~=1.2",
3537
"neo4j": "neo4j~=5.3",
3638
"pandas": "pandas~=2.0.3",
@@ -56,6 +58,7 @@
5658
"mongo": "pymongo~=4.3",
5759
"redshift": "sqlalchemy-redshift==0.8.12",
5860
"snowflake": "snowflake-sqlalchemy~=1.4",
61+
"snowflake-connector": "snowflake-connector-python~=3.18.0",
5962
"elasticsearch8": "elasticsearch8~=8.9.0",
6063
"giturlparse": "giturlparse",
6164
"validators": "validators~=0.22.0",
@@ -66,7 +69,8 @@
6669
"pydoris": "pydoris-custom>=1.0.2,<1.5",
6770
"pyiceberg": "pyiceberg==0.5.1",
6871
"google-cloud-bigtable": "google-cloud-bigtable>=2.0.0",
69-
"pyathena": "pyathena~=3.0",
72+
"pyathena": "pyathena~=3.25.0",
73+
"s3fs": "s3fs~=2023.12.1",
7074
"sqlalchemy-bigquery": "sqlalchemy-bigquery~=1.15.0",
7175
"presidio-analyzer": "presidio-analyzer==2.2.358",
7276
"asammdf": "asammdf~=7.4.5",
@@ -80,12 +84,14 @@
8084
VERSIONS["asammdf"],
8185
VERSIONS["avro"],
8286
VERSIONS["boto3"],
87+
VERSIONS["ijson"],
8388
VERSIONS["pandas"],
8489
VERSIONS["pyarrow"],
8590
VERSIONS["numpy"],
8691
# python-snappy does not work well on 3.11 https://github.com/aio-libs/aiokafka/discussions/931
8792
# Using this as an alternative
8893
"cramjam~=2.7",
94+
"fastavro>=1.2.0",
8995
},
9096
"hive": {
9197
"pure-transport==0.2.0",
@@ -167,7 +173,7 @@
167173
"collate-data-diff>=0.11.9",
168174
"jaraco.functools<4.2.0", # above 4.2 breaks the build
169175
# TODO: Remove one once we have updated datadiff version
170-
"snowflake-connector-python>=3.13.1,<4.0.0",
176+
VERSIONS["snowflake-connector"],
171177
"mysql-connector-python>=8.0.29;python_version<'3.9'",
172178
"mysql-connector-python>=9.1;python_version>='3.9'",
173179
"httpx~=0.28.0",
@@ -233,15 +239,19 @@
233239
VERSIONS["azure-storage-blob"],
234240
VERSIONS["azure-identity"],
235241
VERSIONS["adlfs"],
242+
VERSIONS["aiobotocore"],
236243
*COMMONS["datalake"],
237244
},
238245
"datalake-gcs": {
239246
VERSIONS["google-cloud-monitoring"],
240247
VERSIONS["google-cloud-storage"],
241248
VERSIONS["gcsfs"],
249+
VERSIONS["aiobotocore"],
242250
*COMMONS["datalake"],
243251
},
244252
"datalake-s3": {
253+
VERSIONS["s3fs"],
254+
VERSIONS["aiobotocore"],
245255
*COMMONS["datalake"],
246256
},
247257
"deltalake": {
@@ -252,7 +262,7 @@
252262
"deltalake-storage": {"deltalake>=0.19.0,<0.20"},
253263
"deltalake-spark": {"delta-spark>=3.0.0,<4.0.0", "pyspark==3.5.6"},
254264
"domo": {VERSIONS["pydomo"]},
255-
"doris": {"pydoris==1.0.2"},
265+
"doris": {VERSIONS["pydoris"]},
256266
"druid": {"pydruid>=0.6.5"},
257267
"dynamodb": {VERSIONS["boto3"]},
258268
"elasticsearch": {
@@ -335,7 +345,7 @@
335345
},
336346
"qliksense": {"websocket-client~=1.6.1"},
337347
"presto": {*COMMONS["hive"], DATA_DIFF["presto"]},
338-
"pymssql": {"pymssql~=2.2.0"},
348+
"pymssql": {"pymssql~=2.3.9"},
339349
"quicksight": {VERSIONS["boto3"]},
340350
"redash": {VERSIONS["packaging"]},
341351
"redpanda": {*COMMONS["kafka"]},

ingestion/src/metadata/data_quality/builders/validator_builder.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
from metadata.utils.importer import import_test_case_class
2929

3030
if TYPE_CHECKING:
31-
from pandas import DataFrame
31+
from metadata.data_quality.interface.pandas.pandas_test_suite_interface import (
32+
PandasRunner,
33+
)
3234

3335

3436
class TestCaseImporter:
@@ -51,15 +53,15 @@ class ValidatorBuilder(TestCaseImporter):
5153

5254
def __init__(
5355
self,
54-
runner: Union[QueryRunner, "DataFrame"],
56+
runner: Union[QueryRunner, "PandasRunner"],
5557
test_case: TestCase,
5658
source_type: SourceType,
5759
entity_type: str,
5860
) -> None:
5961
"""Builder object for SQA validators. This builder is used to create a validator object
6062
6163
Args:
62-
runner (QueryRunner): The runner object
64+
runner (Union[QueryRunner, PandasRunner]): The runner object
6365
test_case (TestCase): The test case object
6466
source_type (SourceType): The source type
6567
entity_type (str): one of COLUMN or TABLE -- fetched from the test definition

ingestion/src/metadata/data_quality/validations/base_test_handler.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,11 @@
4343
from metadata.generated.schema.tests.dimensionResult import DimensionResult
4444
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
4545
from metadata.generated.schema.type.basic import Timestamp
46-
from metadata.profiler.processor.runner import QueryRunner
46+
from metadata.profiler.processor.runner import PandasRunner, QueryRunner
4747
from metadata.utils.logger import test_suite_logger
4848
from metadata.utils.sqa_like_column import SQALikeColumn
4949

5050
if TYPE_CHECKING:
51-
from pandas import DataFrame
5251
from sqlalchemy import Column
5352

5453
logger = test_suite_logger()
@@ -105,7 +104,7 @@ class BaseTestValidator(ABC):
105104

106105
def __init__(
107106
self,
108-
runner: Union[QueryRunner, List["DataFrame"]],
107+
runner: Union[QueryRunner, PandasRunner],
109108
test_case: TestCase,
110109
execution_date: Timestamp,
111110
) -> None:
@@ -148,10 +147,7 @@ def run_validation(self) -> TestCaseResult:
148147
)
149148
logger.debug(f"Dimension columns: {self.test_case.dimensionColumns}")
150149

151-
# Validate dimension columns exist in the target table
152150
if not self.are_dimension_columns_valid():
153-
# Don't abort the main test, just skip dimensional validation
154-
# The main test result is still valid
155151
return test_result
156152

157153
try:

ingestion/src/metadata/data_quality/validations/column/pandas/columnValueLengthsToBeBetween.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def _execute_dimensional_validation(
8989
dimension_results = []
9090

9191
try:
92-
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
92+
dfs = self.runner
9393
min_impl = Metrics.MIN_LENGTH(column).get_pandas_computation()
9494
max_impl = Metrics.MAX_LENGTH(column).get_pandas_computation()
9595
row_count_impl = Metrics.ROW_COUNT().get_pandas_computation()

ingestion/src/metadata/data_quality/validations/column/pandas/columnValueMaxToBeBetween.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def _execute_dimensional_validation(
8787
dimension_results = []
8888

8989
try:
90-
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
90+
dfs = self.runner
9191
max_impl = Metrics.MAX(column).get_pandas_computation()
9292

9393
dimension_aggregates = defaultdict(

ingestion/src/metadata/data_quality/validations/column/pandas/columnValueMeanToBeBetween.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def _execute_dimensional_validation(
9191
dimension_results = []
9292

9393
try:
94-
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
94+
dfs = self.runner
9595
mean_impl = Metrics.MEAN(column).get_pandas_computation()
9696

9797
dimension_aggregates = defaultdict(

ingestion/src/metadata/data_quality/validations/column/pandas/columnValueMedianToBeBetween.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def _execute_dimensional_validation(
9292
dimension_results = []
9393

9494
try:
95-
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
95+
dfs = self.runner
9696
median_impl = Metrics.MEDIAN(column).get_pandas_computation()
9797

9898
dimension_aggregates = defaultdict(

ingestion/src/metadata/data_quality/validations/column/pandas/columnValueMinToBeBetween.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def _execute_dimensional_validation(
8888
dimension_results = []
8989

9090
try:
91-
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
91+
dfs = self.runner
9292
min_impl = Metrics.MIN(column).get_pandas_computation()
9393

9494
dimension_aggregates = defaultdict(

ingestion/src/metadata/data_quality/validations/column/pandas/columnValueStdDevToBeBetween.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def _execute_dimensional_validation(
9494
dimension_results = []
9595

9696
try:
97-
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
97+
dfs = self.runner
9898
stddev_impl = Metrics.STDDEV(column).get_pandas_computation()
9999
row_count_impl = Metrics.ROW_COUNT().get_pandas_computation()
100100

ingestion/src/metadata/data_quality/validations/column/pandas/columnValuesMissingCount.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def _execute_dimensional_validation(
8989
dimension_results = []
9090

9191
try:
92-
dfs = self.runner if isinstance(self.runner, list) else [self.runner]
92+
dfs = self.runner
9393

9494
metric_expressions = {
9595
Metrics.NULL_MISSING_COUNT.name: Metrics.NULL_MISSING_COUNT(

0 commit comments

Comments
 (0)