Skip to content

Commit b75c0bc

Browse files
committed
feat: improved movies dataset test coverage. Added testing for spark and duckdb refdata loaders when table config specified.
1 parent 8805b90 commit b75c0bc

10 files changed

Lines changed: 381 additions & 9 deletions

File tree

src/dve/core_engine/backends/implementations/duckdb/reference_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def __init__(
4242

4343
def load_table(self, config: ReferenceTable) -> DuckDBPyRelation:
4444
"""Load reference entity from a database table"""
45-
return self.connection.table(f"{config.fq_table_name}")
45+
return self.connection.sql(f"select * from {config.fq_table_name}")
4646

4747
def load_file(self, config: ReferenceFile) -> DuckDBPyRelation:
4848
"Load reference entity from a relative file path"

src/dve/core_engine/backends/implementations/spark/readers/json.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ class SparkJSONReader(BaseFileReader):
2525
def __init__(
2626
self,
2727
*,
28-
encoding: Optional[str] = "utf-8-sig",
29-
multi_line: Optional[bool] = False,
28+
encoding: Optional[str] = "utf-8",
29+
multi_line: Optional[bool] = True,
3030
spark_session: Optional[SparkSession] = None
3131
) -> None:
3232

@@ -57,7 +57,8 @@ def read_to_dataframe(
5757

5858
spark_schema: StructType = get_type_from_annotation(schema)
5959
kwargs = {
60-
"multiLine": self.multi_line,
60+
"encoding": self.encoding,
61+
"multiline": self.multi_line,
6162

6263
}
6364

tests/features/movies.feature

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,45 @@ Feature: Pipeline tests using the movies dataset
77
Some validation of entity attributes is performed: SQL expressions and Python filter
88
functions are used, and templatable business rules feature in the transformations.
99

10+
Scenario: Validate and filter movies (spark)
11+
Given I submit the movies file movies.json for processing
12+
And A spark pipeline is configured
13+
And I create the following reference data tables in the database movies_refdata
14+
| table_name | parquet_path |
15+
| sequels | tests/testdata/movies/refdata/movies_sequels.parquet |
16+
And I add initial audit entries for the submission
17+
Then the latest audit record for the submission is marked with processing status file_transformation
18+
When I run the file transformation phase
19+
Then the movies entity is stored as a parquet after the file_transformation phase
20+
And the latest audit record for the submission is marked with processing status data_contract
21+
When I run the data contract phase
22+
Then there are 3 record rejections from the data_contract phase
23+
And there are errors with the following details and associated error_count from the data_contract phase
24+
| ErrorCode | ErrorMessage | error_count |
25+
| BLANKYEAR | year not provided | 1 |
26+
| DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
27+
| DODGYDATE | date_joined value is not valid: daft_date | 1 |
28+
And the movies entity is stored as a parquet after the data_contract phase
29+
And the latest audit record for the submission is marked with processing status business_rules
30+
When I run the business rules phase
31+
Then The rules restrict "movies" to 4 qualifying records
32+
And At least one row from "movies" has generated error code "LIMITED_RATINGS"
33+
And At least one row from "derived" has generated error code "RUBBISH_SEQUEL"
34+
And the latest audit record for the submission is marked with processing status error_report
35+
When I run the error report phase
36+
Then An error report is produced
37+
And The statistics entry for the submission shows the following information
38+
| parameter | value |
39+
| record_count | 5 |
40+
| number_record_rejections | 4 |
41+
| number_warnings | 1 |
42+
1043
Scenario: Validate and filter movies (duckdb)
1144
Given I submit the movies file movies.json for processing
12-
And A duckdb pipeline is configured
45+
And A duckdb pipeline is configured with schema file 'movies_ddb.dischema.json'
46+
And I create the following reference data tables in the database "movies_refdata"
47+
| table_name | parquet_path |
48+
| sequels | tests/testdata/movies/refdata/movies_sequels.parquet |
1349
And I add initial audit entries for the submission
1450
Then the latest audit record for the submission is marked with processing status file_transformation
1551
When I run the file transformation phase
@@ -24,3 +60,16 @@ Feature: Pipeline tests using the movies dataset
2460
| DODGYDATE | date_joined value is not valid: daft_date | 1 |
2561
And the movies entity is stored as a parquet after the data_contract phase
2662
And the latest audit record for the submission is marked with processing status business_rules
63+
When I run the business rules phase
64+
Then The rules restrict "movies" to 4 qualifying records
65+
And At least one row from "movies" has generated error code "LIMITED_RATINGS"
66+
And At least one row from "derived" has generated error code "RUBBISH_SEQUEL"
67+
And the latest audit record for the submission is marked with processing status error_report
68+
When I run the error report phase
69+
Then An error report is produced
70+
And The statistics entry for the submission shows the following information
71+
| parameter | value |
72+
| record_count | 5 |
73+
| number_record_rejections | 4 |
74+
| number_warnings | 1 |
75+

tests/features/steps/steps_pipeline.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,3 +295,30 @@ def check_rows_eq_to_category(context: Context, entity_name: str, category: str)
295295
(pl.col("Entity").eq(entity_name)) & (pl.col("Category").eq(category))
296296
).shape[0]
297297
assert recs_with_err_code >= 1
298+
299+
@given("I create the following reference data tables in the database {database}")
300+
def create_refdata_tables(context: Context, database: str):
301+
table: Optional[Table] = context.table
302+
refdata_tables: Dict[str, URI] = {}
303+
row: Row
304+
for row in table:
305+
record = row.as_dict()
306+
refdata_tables[record["table_name"]] = record["parquet_path"]
307+
pipeline = ctxt.get_pipeline(context)
308+
refdata_loader = getattr(pipeline, "_reference_data_loader")
309+
if refdata_loader == SparkRefDataLoader:
310+
refdata_loader.spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}")
311+
for tbl, source in refdata_tables.items():
312+
(refdata_loader.spark.read.parquet(source)
313+
.write.saveAsTable(f"{database}.{tbl}"))
314+
315+
if refdata_loader == DuckDBRefDataLoader:
316+
ref_db_file = Path(ctxt.get_processing_location(context), f"{database}.duckdb").as_posix()
317+
refdata_loader.connection.sql(f"ATTACH '{ref_db_file}' AS {database}")
318+
for tbl, source in refdata_tables.items():
319+
refdata_loader.connection.read_parquet(source).to_table(f"{database}.{tbl}")
320+
321+
322+
323+
324+

tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def test_dve_audit_using_thread_pool(ddb_audit_manager_threaded: DDBAuditingMana
174174

175175
aud.add_new_submissions([_sub_info])
176176
while not aud.queue.empty():
177-
time.sleep(2)
177+
time.sleep(0.2)
178178

179179
at_entry = list(
180180
aud._processing_status.get_relation()
@@ -188,7 +188,7 @@ def test_dve_audit_using_thread_pool(ddb_audit_manager_threaded: DDBAuditingMana
188188
assert len(at_entry) == 1
189189
aud.mark_transform([_sub_info.submission_id])
190190
while not aud.queue.empty():
191-
time.sleep(2)
191+
time.sleep(0.2)
192192

193193
file_trans = aud.get_all_file_transformation_submissions()
194194
assert [rw.get("submission_id") for rw in file_trans.pl().iter_rows(named=True)] == [

tests/testdata/movies/movies.dischema.json

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
},
3131
"reader_config": {
3232
".json": {
33-
"reader": "DuckDBJSONReader"
33+
"reader": "SparkJSONReader"
3434
}
3535
},
3636
"mandatory_fields": [
@@ -40,7 +40,34 @@
4040
}
4141
}
4242
},
43-
"transformations": {}
43+
"transformations": {
44+
"parameters": {"entity": "movies"},
45+
"reference_data": {
46+
"sequels": {
47+
"type": "table",
48+
"database": "movies_refdata",
49+
"table_name": "sequels"
50+
}
51+
},
52+
"rule_stores": [
53+
{
54+
"store_type": "json",
55+
"filename": "movies_spark_rule_store.json"
56+
}
57+
],
58+
"complex_rules": [
59+
{
60+
"rule_name": "ratings_count"
61+
},
62+
{
63+
"rule_name": "poor_sequel_check",
64+
"parameters": {
65+
"sequel_entity": "refdata_sequels"
66+
}
67+
}
68+
]
69+
}
4470
}
4571

72+
4673

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
{
2+
"contract": {
3+
"schemas": {
4+
"cast": {
5+
"fields": {
6+
"name": "str",
7+
"role": "str",
8+
"date_joined": "date"
9+
}
10+
}
11+
},
12+
"error_details": "movies_contract_error_details.json",
13+
"datasets": {
14+
"movies": {
15+
"fields": {
16+
"title": "str",
17+
"year": "int",
18+
"genre": {
19+
"type": "str",
20+
"is_array": true
21+
},
22+
"duration_minutes": "int",
23+
"ratings": {
24+
"type": "NonNegativeFloat",
25+
"is_array": true
26+
},
27+
"cast": {
28+
"model": "cast",
29+
"is_array": true}
30+
},
31+
"reader_config": {
32+
".json": {
33+
"reader": "DuckDBJSONReader"
34+
}
35+
},
36+
"mandatory_fields": [
37+
"title",
38+
"year"
39+
]
40+
}
41+
}
42+
},
43+
"transformations": {
44+
"parameters": {"entity": "movies"},
45+
"reference_data": {
46+
"sequels": {
47+
"type": "table",
48+
"database": "movies_refdata",
49+
"table_name": "sequels"
50+
}
51+
},
52+
"rule_stores": [
53+
{
54+
"store_type": "json",
55+
"filename": "movies_ddb_rule_store.json"
56+
}
57+
],
58+
"complex_rules": [
59+
{
60+
"rule_name": "ratings_count"
61+
},
62+
{
63+
"rule_name": "poor_sequel_check",
64+
"parameters": {
65+
"sequel_entity": "refdata_sequels"
66+
}
67+
}
68+
]
69+
}
70+
}
71+
72+
73+
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
{
2+
"ratings_count": {
3+
"description": "Ensure more than 1 rating",
4+
"type": "complex_rule",
5+
"parameter_descriptions": {
6+
"entity": "The entity to apply the workflow to."
7+
},
8+
"parameter_defaults": {},
9+
"rule_config": {
10+
"rules": [
11+
{
12+
"name": "Get count of ratings",
13+
"operation": "add",
14+
"entity": "{{entity}}",
15+
"column_name": "no_of_ratings",
16+
"expression": "length(ratings)"
17+
}
18+
],
19+
"filters": [
20+
{
21+
"name": "filter_too_few_ratings",
22+
"entity": "{{entity}}",
23+
"expression": "no_of_ratings > 1",
24+
"error_code": "LIMITED_RATINGS",
25+
"reporting_field": "title",
26+
"failure_message": "Movie has too few ratings"
27+
}
28+
],
29+
"post_filter_rules": [
30+
{
31+
"name": "Remove the no_of_ratings field",
32+
"operation": "remove",
33+
"entity": "{{entity}}",
34+
"column_name": "no_of_ratings"
35+
}
36+
]
37+
}
38+
},
39+
"poor_sequel_check": {
40+
"description": "check if bad sequel exists",
41+
"type": "complex_rule",
42+
"parameter_descriptions": {
43+
"entity": "The entity to apply the workflow to.",
44+
"sequel_entity": "The entity containing sequel data"
45+
},
46+
"parameter_defaults": {},
47+
"rule_config": {
48+
"rules": [
49+
{
50+
"name": "Join sequel data",
51+
"operation": "inner_join",
52+
"entity": "{{entity}}",
53+
"target": "{{sequel_entity}}",
54+
"join_condition": "{{entity}}.title = {{sequel_entity}}.sequel_to",
55+
"new_entity_name": "with_sequels",
56+
"new_columns": {
57+
"{{sequel_entity}}.ratings": "sequel_rating"
58+
}
59+
},
60+
{
61+
"name": "Get median sequel rating",
62+
"operation": "group_by",
63+
"entity": "with_sequels",
64+
"group_by": "title",
65+
"agg_columns": {
66+
"list_aggregate(sequel_rating, 'median')": "median_sequel_rating"
67+
}
68+
}
69+
70+
],
71+
"filters": [
72+
{
73+
"name": "filter_rubbish_sequel",
74+
"entity": "with_sequels",
75+
"expression": "median_sequel_rating > 5",
76+
"error_code": "RUBBISH_SEQUEL",
77+
"reporting_entity": "derived",
78+
"reporting_field": "title",
79+
"failure_message": "Movie has rubbish sequel",
80+
"is_informational": true
81+
}
82+
],
83+
"post_filter_rules": [
84+
{
85+
"name": "Remove the with_sequel entity",
86+
"operation": "remove_entity",
87+
"entity": "with_sequels"
88+
}
89+
]
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)