Skip to content

Commit 7923a20

Browse files
committed
feat: added functionality to allow error messages in business rules to be templated
1 parent 3bfccde commit 7923a20

9 files changed

Lines changed: 64 additions & 55 deletions

File tree

src/dve/core_engine/backends/implementations/duckdb/rules.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
from dve.core_engine.constants import ROWID_COLUMN_NAME
5353
from dve.core_engine.functions import implementations as functions
5454
from dve.core_engine.message import FeedbackMessage
55+
from dve.core_engine.templating import template_object
5556
from dve.core_engine.type_hints import Messages
5657

5758

@@ -510,12 +511,14 @@ def notify(self, entities: DuckDBEntities, *, config: Notification) -> Messages:
510511
matched = matched.select(StarExpression(exclude=config.excluded_columns))
511512

512513
for record in matched.df().to_dict(orient="records"):
514+
# NOTE: only templates using values directly accessible in record - nothing nested
515+
# more complex extraction done in reporting module
513516
messages.append(
514517
FeedbackMessage(
515518
entity=config.reporting.reporting_entity_override or config.entity_name,
516519
record=record, # type: ignore
517520
error_location=config.reporting.legacy_location,
518-
error_message=config.reporting.message,
521+
error_message=template_object(config.reporting.message, record),
519522
failure_type=config.reporting.legacy_error_type,
520523
error_type=config.reporting.legacy_error_type,
521524
error_code=config.reporting.code,

src/dve/core_engine/backends/implementations/spark/rules.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from dve.core_engine.constants import ROWID_COLUMN_NAME
4646
from dve.core_engine.functions import implementations as functions
4747
from dve.core_engine.message import FeedbackMessage
48+
from dve.core_engine.templating import template_object
4849
from dve.core_engine.type_hints import Messages
4950

5051

@@ -406,11 +407,13 @@ def notify(self, entities: SparkEntities, *, config: Notification) -> Messages:
406407

407408
for record in matched.toLocalIterator():
408409
messages.append(
410+
# NOTE: only templates using values directly accessible in record - nothing nested
411+
# more complex extraction done in reporting module
409412
FeedbackMessage(
410413
entity=config.reporting.reporting_entity_override or config.entity_name,
411414
record=record.asDict(recursive=True),
412415
error_location=config.reporting.legacy_location,
413-
error_message=config.reporting.message,
416+
error_message=template_object(config.reporting.message, record.asDict(recursive=True)),
414417
failure_type=config.reporting.legacy_error_type,
415418
error_type=config.reporting.legacy_error_type,
416419
error_code=config.reporting.code,

src/dve/core_engine/backends/metadata/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from pydantic import BaseModel
1111

1212
from dve.core_engine.backends.metadata.contract import DataContractMetadata, ReaderConfig
13-
from dve.core_engine.backends.metadata.reporting import ReportingConfig, UntemplatedReportingConfig
13+
from dve.core_engine.backends.metadata.reporting import ReportingConfig, LegacyReportingConfig
1414
from dve.core_engine.backends.metadata.rules import (
1515
AbstractStep,
1616
Aggregation,

src/dve/core_engine/backends/metadata/reporting.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class BaseReportingConfig(BaseModel):
2727
2828
"""
2929

30-
UNTEMPLATED_FIELDS: ClassVar[Set[str]] = set()
30+
UNTEMPLATED_FIELDS: ClassVar[Set[str]] = {"message"}
3131
"""Fields that should not be templated."""
3232

3333
emit: Optional[str] = None
@@ -117,15 +117,13 @@ def template(
117117
else:
118118
variables = local_variables
119119
templated = template_object(self.dict(exclude=self.UNTEMPLATED_FIELDS), variables, "jinja")
120+
templated.update(self.dict(include=self.UNTEMPLATED_FIELDS))
120121
return type_(**templated)
121122

122123

123124
class ReportingConfig(BaseReportingConfig):
124125
"""A base model defining the 'final' reporting config for a message."""
125126

126-
UNTEMPLATED_FIELDS: ClassVar[Set[str]] = {"message"}
127-
"""Fields that should not be templated."""
128-
129127
emit: ErrorEmitValue = "record_failure"
130128
category: ErrorCategory = "Bad value"
131129

@@ -246,7 +244,7 @@ def get_location_value(
246244
return self.get_location_selector()(record)
247245

248246

249-
class UntemplatedReportingConfig(BaseReportingConfig):
247+
class LegacyReportingConfig(BaseReportingConfig):
250248
"""An untemplated reporting config. This _must_ be templated prior to use.
251249
252250
This class also enables the conversion of deprecated fields to their
@@ -356,7 +354,8 @@ def template(
356354
else:
357355
variables = local_variables
358356

359-
templated = template_object(self.dict(), variables, "jinja")
357+
templated = template_object(self.dict(exclude=self.UNTEMPLATED_FIELDS), variables, "jinja")
358+
templated.update(self.dict(include=self.UNTEMPLATED_FIELDS))
360359
error_location = templated.pop("legacy_location")
361360
reporting_field = templated.pop("legacy_reporting_field")
362361
if templated.get("location") is None:

src/dve/core_engine/backends/metadata/rules.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing_extensions import Literal
2121

2222
from dve.core_engine.backends.base.reference_data import ReferenceConfigUnion
23-
from dve.core_engine.backends.metadata.reporting import ReportingConfig, UntemplatedReportingConfig
23+
from dve.core_engine.backends.metadata.reporting import ReportingConfig, LegacyReportingConfig
2424
from dve.core_engine.templating import template_object
2525
from dve.core_engine.type_hints import (
2626
Alias,
@@ -234,7 +234,7 @@ class DeferredFilter(AbstractStep):
234234
removed from the source entity if the reporting level is a record-level error.
235235
236236
"""
237-
reporting: Union[ReportingConfig, UntemplatedReportingConfig]
237+
reporting: Union[ReportingConfig, LegacyReportingConfig]
238238
"""The reporting information for the filter."""
239239

240240
def template(

src/dve/core_engine/configuration/v1/filters.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from pydantic import BaseModel, Field
66

7-
from dve.core_engine.backends.metadata.reporting import UntemplatedReportingConfig
7+
from dve.core_engine.backends.metadata.reporting import LegacyReportingConfig
88
from dve.core_engine.backends.metadata.rules import AbstractStep, DeferredFilter
99
from dve.core_engine.type_hints import ErrorCategory
1010

@@ -27,7 +27,7 @@ class ConcreteFilterConfig(BaseModel):
2727

2828
def to_step(self) -> AbstractStep:
2929
"""Create a deferred filter from the concrete filter config."""
30-
reporting = UntemplatedReportingConfig(
30+
reporting = LegacyReportingConfig(
3131
code=self.error_code,
3232
message=self.failure_message,
3333
category=self.category,

tests/features/movies.feature

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,46 @@
11
Feature: Pipeline tests using the movies dataset
2-
Tests for the processing framework which use the movies dataset.
2+
Tests for the processing framework which use the movies dataset.
33

4-
This tests submissions in JSON format, with configuration in JSON config files.
5-
Complex types are tested (arrays, nested structs)
4+
This tests submissions in JSON format, with configuration in JSON config files.
5+
Complex types are tested (arrays, nested structs)
66

7-
Some validation of entity attributes is performed: SQL expressions and Python filter
8-
functions are used, and templatable business rules feature in the transformations.
7+
Some validation of entity attributes is performed: SQL expressions and Python filter
8+
functions are used, and templatable business rules feature in the transformations.
99

1010
Scenario: Validate and filter movies (spark)
11-
Given I submit the movies file movies.json for processing
12-
And A spark pipeline is configured
13-
And I create the following reference data tables in the database movies_refdata
14-
| table_name | parquet_path |
15-
| sequels | tests/testdata/movies/refdata/movies_sequels.parquet |
16-
And I add initial audit entries for the submission
17-
Then the latest audit record for the submission is marked with processing status file_transformation
18-
When I run the file transformation phase
19-
Then the movies entity is stored as a parquet after the file_transformation phase
20-
And the latest audit record for the submission is marked with processing status data_contract
21-
When I run the data contract phase
22-
Then there are 3 record rejections from the data_contract phase
23-
And there are errors with the following details and associated error_count from the data_contract phase
24-
| ErrorCode | ErrorMessage | error_count |
25-
| BLANKYEAR | year not provided | 1 |
26-
| DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
27-
| DODGYDATE | date_joined value is not valid: daft_date | 1 |
28-
And the movies entity is stored as a parquet after the data_contract phase
29-
And the latest audit record for the submission is marked with processing status business_rules
30-
When I run the business rules phase
31-
Then The rules restrict "movies" to 4 qualifying records
32-
And At least one row from "movies" has generated error code "LIMITED_RATINGS"
33-
And At least one row from "derived" has generated error code "RUBBISH_SEQUEL"
34-
And the latest audit record for the submission is marked with processing status error_report
35-
When I run the error report phase
36-
Then An error report is produced
37-
And The statistics entry for the submission shows the following information
38-
| parameter | value |
39-
| record_count | 5 |
40-
| number_record_rejections | 4 |
41-
| number_warnings | 1 |
11+
Given I submit the movies file movies.json for processing
12+
And A spark pipeline is configured
13+
And I create the following reference data tables in the database movies_refdata
14+
| table_name | parquet_path |
15+
| sequels | tests/testdata/movies/refdata/movies_sequels.parquet |
16+
And I add initial audit entries for the submission
17+
Then the latest audit record for the submission is marked with processing status file_transformation
18+
When I run the file transformation phase
19+
Then the movies entity is stored as a parquet after the file_transformation phase
20+
And the latest audit record for the submission is marked with processing status data_contract
21+
When I run the data contract phase
22+
Then there are 3 record rejections from the data_contract phase
23+
And there are errors with the following details and associated error_count from the data_contract phase
24+
| ErrorCode | ErrorMessage | error_count |
25+
| BLANKYEAR | year not provided | 1 |
26+
| DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
27+
| DODGYDATE | date_joined value is not valid: daft_date | 1 |
28+
And the movies entity is stored as a parquet after the data_contract phase
29+
And the latest audit record for the submission is marked with processing status business_rules
30+
When I run the business rules phase
31+
Then The rules restrict "movies" to 4 qualifying records
32+
And there are errors with the following details and associated error_count from the business_rules phase
33+
| ErrorCode | ErrorMessage | error_count |
34+
| LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 |
35+
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
36+
And the latest audit record for the submission is marked with processing status error_report
37+
When I run the error report phase
38+
Then An error report is produced
39+
And The statistics entry for the submission shows the following information
40+
| parameter | value |
41+
| record_count | 5 |
42+
| number_record_rejections | 4 |
43+
| number_warnings | 1 |
4244

4345
Scenario: Validate and filter movies (duckdb)
4446
Given I submit the movies file movies.json for processing
@@ -62,8 +64,10 @@ Feature: Pipeline tests using the movies dataset
6264
And the latest audit record for the submission is marked with processing status business_rules
6365
When I run the business rules phase
6466
Then The rules restrict "movies" to 4 qualifying records
65-
And At least one row from "movies" has generated error code "LIMITED_RATINGS"
66-
And At least one row from "derived" has generated error code "RUBBISH_SEQUEL"
67+
And there are errors with the following details and associated error_count from the business_rules phase
68+
| ErrorCode | ErrorMessage | error_count |
69+
| LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 |
70+
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
6771
And the latest audit record for the submission is marked with processing status error_report
6872
When I run the error report phase
6973
Then An error report is produced

tests/testdata/movies/movies_ddb_rule_store.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
"expression": "no_of_ratings > 1",
2424
"error_code": "LIMITED_RATINGS",
2525
"reporting_field": "title",
26-
"failure_message": "Movie has too few ratings"
26+
"failure_message": "Movie has too few ratings ({{ratings}})"
2727
}
2828
],
2929
"post_filter_rules": [
@@ -76,7 +76,7 @@
7676
"error_code": "RUBBISH_SEQUEL",
7777
"reporting_entity": "derived",
7878
"reporting_field": "title",
79-
"failure_message": "Movie has rubbish sequel",
79+
"failure_message": "The movie {{title}} has a rubbish sequel",
8080
"is_informational": true
8181
}
8282
],

tests/testdata/movies/movies_spark_rule_store.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
"expression": "no_of_ratings > 1",
2424
"error_code": "LIMITED_RATINGS",
2525
"reporting_field": "title",
26-
"failure_message": "Movie has too few ratings"
26+
"failure_message": "Movie has too few ratings ({{ratings}})"
2727
}
2828
],
2929
"post_filter_rules": [
@@ -87,7 +87,7 @@
8787
"error_code": "RUBBISH_SEQUEL",
8888
"reporting_entity": "derived",
8989
"reporting_field": "title",
90-
"failure_message": "Movie has rubbish sequel",
90+
"failure_message": "The movie {{title}} has a rubbish sequel",
9191
"is_informational": true
9292
}
9393
],

0 commit comments

Comments
 (0)