1+ from concurrent .futures import ProcessPoolExecutor
12import json
3+ from multiprocessing import cpu_count
24from pathlib import Path
35from typing import Any , Dict , List , Tuple
46
3032 temp_xml_file ,
3133)
3234
33- def test_duckdb_data_contract_csv (temp_csv_file ):
35+ @pytest .fixture (scope = "module" )
36+ def temp_process_pool_executor ():
37+ with ProcessPoolExecutor (cpu_count () - 1 ) as pool :
38+ yield pool
39+
40+
41+ def test_duckdb_data_contract_csv (temp_csv_file , temp_process_pool_executor ):
3442 uri , _ , _ , mdl = temp_csv_file
3543 connection = default_connection
3644
@@ -89,7 +97,7 @@ def test_duckdb_data_contract_csv(temp_csv_file):
8997 }
9098 entity_locations : Dict [str , URI ] = {"test_ds" : str (uri )}
9199
92- data_contract : DuckDBDataContract = DuckDBDataContract (connection )
100+ data_contract : DuckDBDataContract = DuckDBDataContract (connection , executor = temp_process_pool_executor )
93101 entities , feedback_errors_uri , stage_successful = data_contract .apply_data_contract (get_parent (uri .as_posix ()), entities , entity_locations , dc_meta )
94102 rel : DuckDBPyRelation = entities .get ("test_ds" )
95103 assert dict (zip (rel .columns , rel .dtypes )) == {
@@ -100,7 +108,7 @@ def test_duckdb_data_contract_csv(temp_csv_file):
100108 assert stage_successful
101109
102110
103- def test_duckdb_data_contract_xml (temp_xml_file ):
111+ def test_duckdb_data_contract_xml (temp_xml_file , temp_process_pool_executor ):
104112 uri , header_model , header_data , class_model , class_data = temp_xml_file
105113 connection = default_connection
106114 contract_meta = json .dumps (
@@ -187,7 +195,7 @@ def test_duckdb_data_contract_xml(temp_xml_file):
187195 reporting_fields = {"test_header" : ["school" ], "test_class_info" : ["year" ]},
188196 )
189197
190- data_contract : DuckDBDataContract = DuckDBDataContract (connection )
198+ data_contract : DuckDBDataContract = DuckDBDataContract (connection , executor = temp_process_pool_executor )
191199 entities , feedback_errors_uri , stage_successful = data_contract .apply_data_contract (get_parent (uri .as_posix ()), entities , entity_locations , dc_meta )
192200 header_rel : DuckDBPyRelation = entities .get ("test_header" )
193201 header_expected_schema : Dict [str , DuckDBPyType ] = {
@@ -327,10 +335,11 @@ def test_ddb_data_contract_read_nested_parquet(nested_all_string_parquet):
327335 }
328336
329337def test_duckdb_data_contract_custom_error_details (nested_all_string_parquet_w_errors ,
330- nested_parquet_custom_dc_err_details ):
338+ nested_parquet_custom_dc_err_details ,
339+ temp_process_pool_executor ):
331340 parquet_uri , contract_meta , _ = nested_all_string_parquet_w_errors
332341 connection = default_connection
333- data_contract = DuckDBDataContract (connection )
342+ data_contract = DuckDBDataContract (connection , executor = temp_process_pool_executor )
334343
335344 entity = data_contract .read_parquet (path = parquet_uri )
336345 assert entity .count ("*" ).fetchone ()[0 ] == 2
0 commit comments