Skip to content

Commit bf170f4

Browse files
committed
feat: integrate record index into error report
1 parent dc0b68d commit bf170f4

11 files changed

Lines changed: 39 additions & 67 deletions

File tree

src/dve/core_engine/message.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ class UserMessage:
116116
"The offending values"
117117
Category: ErrorCategory
118118
"The category of error"
119+
RecordIndex: Optional[int] = None
120+
"The record index where the error occurred (if applicable)"
119121

120122
@property
121123
def is_informational(self) -> bool:
@@ -187,6 +189,7 @@ class FeedbackMessage: # pylint: disable=too-many-instance-attributes
187189
"ErrorMessage",
188190
"ErrorCode",
189191
"ReportingField",
192+
"RecordIndex",
190193
"Value",
191194
"Category",
192195
]
@@ -224,15 +227,6 @@ def _validate_error_location(cls, value: Any) -> Optional[str]:
224227

225228
return str(value)
226229

227-
@validator("record")
228-
def _strip_rowid( # pylint: disable=no-self-argument
229-
cls, value: Optional[dict[str, Any]]
230-
) -> Optional[dict[str, Any]]:
231-
"""Strip the row ID column from the record, if present."""
232-
if isinstance(value, dict):
233-
value.pop(RECORD_INDEX_COLUMN_NAME, None)
234-
return value
235-
236230
@property
237231
def is_critical(self) -> bool:
238232
"""Whether the error is unrecoverable."""
@@ -333,6 +327,7 @@ def to_row(
333327
error_message,
334328
self.error_code,
335329
self.reporting_field_name or reporting_field,
330+
self.record.get(RECORD_INDEX_COLUMN_NAME),
336331
value,
337332
self.category,
338333
)

src/dve/pipeline/pipeline.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,9 @@ def apply_data_contract(
432432

433433
for path, _ in fh.iter_prefix(read_from):
434434
entity_locations[fh.get_file_name(path)] = path
435-
entities[fh.get_file_name(path)] = self.data_contract.read_parquet(path)
435+
entities[fh.get_file_name(path)] = self.data_contract.add_record_index(
436+
self.data_contract.read_parquet(path)
437+
)
436438

437439
key_fields = {model: conf.reporting_fields for model, conf in model_config.items()}
438440

@@ -743,6 +745,7 @@ def _get_error_dataframes(self, submission_id: str):
743745
pl.col("ErrorCode").alias("Error_Code"), # type: ignore
744746
pl.col("ReportingField").alias("Data_Item"), # type: ignore
745747
pl.col("ErrorMessage").alias("Error"), # type: ignore
748+
pl.col("RecordIndex").alias("Record_Index"),
746749
pl.col("Value"), # type: ignore
747750
pl.col("Key").alias("ID"), # type: ignore
748751
pl.col("Category"), # type: ignore

src/dve/reporting/error_report.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"Error_Code": Utf8(),
1919
"Data_Item": Utf8(),
2020
"Error": Utf8(),
21+
"Record_Index": pl.UInt32(),
2122
"Value": Utf8(),
2223
"ID": Utf8(),
2324
"Category": Utf8(),

tests/features/books.feature

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,6 @@ Feature: Pipeline tests using the books dataset
44
This tests submissions using nested, complex JSON datasets with arrays, and
55
introduces more complex transformations that require aggregation.
66

7-
Scenario: Validate complex nested XML data (spark)
8-
Given I submit the books file nested_books.XML for processing
9-
And A spark pipeline is configured with schema file 'nested_books.dischema.json'
10-
And I add initial audit entries for the submission
11-
Then the latest audit record for the submission is marked with processing status file_transformation
12-
When I run the file transformation phase
13-
Then the header entity is stored as a parquet after the file_transformation phase
14-
And the nested_books entity is stored as a parquet after the file_transformation phase
15-
And the latest audit record for the submission is marked with processing status data_contract
16-
When I run the data contract phase
17-
Then there is 1 record rejection from the data_contract phase
18-
And the header entity is stored as a parquet after the data_contract phase
19-
And the nested_books entity is stored as a parquet after the data_contract phase
20-
And the latest audit record for the submission is marked with processing status business_rules
21-
When I run the business rules phase
22-
Then The rules restrict "nested_books" to 3 qualifying records
23-
And The entity "nested_books" contains an entry for "17.85" in column "total_value_of_books"
24-
And the nested_books entity is stored as a parquet after the business_rules phase
25-
And the latest audit record for the submission is marked with processing status error_report
26-
When I run the error report phase
27-
Then An error report is produced
28-
And The statistics entry for the submission shows the following information
29-
| parameter | value |
30-
| record_count | 4 |
31-
| number_record_rejections | 2 |
32-
| number_warnings | 0 |
33-
347
Scenario: Validate complex nested XML data (duckdb)
358
Given I submit the books file nested_books.XML for processing
369
And A duckdb pipeline is configured with schema file 'nested_books_ddb.dischema.json'

tests/features/movies.feature

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,18 @@ Feature: Pipeline tests using the movies dataset
2121
When I run the data contract phase
2222
Then there are 3 record rejections from the data_contract phase
2323
And there are errors with the following details and associated error_count from the data_contract phase
24-
| Entity | ErrorCode | ErrorMessage | error_count |
25-
| movies | BLANKYEAR | year not provided | 1 |
26-
| movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
27-
| movies | DODGYDATE | date_joined value is not valid: daft_date | 1 |
24+
| Entity | ErrorCode | ErrorMessage | RecordIndex | error_count |
25+
| movies | BLANKYEAR | year not provided | 2 | 1 |
26+
| movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 | 1 |
27+
| movies | DODGYDATE | date_joined value is not valid: daft_date | 1 | 1 |
2828
And the movies entity is stored as a parquet after the data_contract phase
2929
And the latest audit record for the submission is marked with processing status business_rules
3030
When I run the business rules phase
3131
Then The rules restrict "movies" to 4 qualifying records
3232
And there are errors with the following details and associated error_count from the business_rules phase
33-
| ErrorCode | ErrorMessage | error_count |
34-
| LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 |
35-
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
33+
| ErrorCode | ErrorMessage | RecordIndex | error_count |
34+
| LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 4 | 1 |
35+
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | 1 |
3636
And the latest audit record for the submission is marked with processing status error_report
3737
When I run the error report phase
3838
Then An error report is produced
@@ -57,18 +57,18 @@ Feature: Pipeline tests using the movies dataset
5757
When I run the data contract phase
5858
Then there are 3 record rejections from the data_contract phase
5959
And there are errors with the following details and associated error_count from the data_contract phase
60-
| Entity | ErrorCode | ErrorMessage | error_count |
61-
| movies | BLANKYEAR | year not provided | 1 |
62-
| movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
63-
| movies | DODGYDATE | date_joined value is not valid: daft_date | 1 |
60+
| Entity | ErrorCode | ErrorMessage | RecordIndex | error_count |
61+
| movies | BLANKYEAR | year not provided | 2 | 1 |
62+
| movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 | 1 |
63+
| movies | DODGYDATE | date_joined value is not valid: daft_date | 1 | 1 |
6464
And the movies entity is stored as a parquet after the data_contract phase
6565
And the latest audit record for the submission is marked with processing status business_rules
6666
When I run the business rules phase
6767
Then The rules restrict "movies" to 4 qualifying records
6868
And there are errors with the following details and associated error_count from the business_rules phase
69-
| ErrorCode | ErrorMessage | error_count |
70-
| LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 |
71-
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
69+
| ErrorCode | ErrorMessage | RecordIndex | error_count |
70+
| LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 4 | 1 |
71+
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | 1 |
7272
And the latest audit record for the submission is marked with processing status error_report
7373
When I run the error report phase
7474
Then An error report is produced

tests/features/steps/utilities.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
"ErrorType",
2424
"ErrorLocation",
2525
"ErrorMessage",
26+
"RecordIndex",
2627
"ReportingField",
2728
"Category",
2829
]

tests/test_core_engine/test_message.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,8 @@
88
from pydantic import BaseModel, ValidationError
99
import pytest
1010

11-
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
1211
from dve.core_engine.message import DEFAULT_ERROR_DETAIL, DataContractErrorDetail, FeedbackMessage
1312

14-
15-
def test_rowid_column_stripped():
16-
"""Ensure that the rowID column is stripped from FeedbackMessages."""
17-
18-
message = FeedbackMessage(
19-
entity="entity", record={"key": "value", RECORD_INDEX_COLUMN_NAME: "some identifier"}
20-
)
21-
22-
assert message.record.get(RECORD_INDEX_COLUMN_NAME) is None
23-
24-
2513
@pytest.mark.parametrize(
2614
("derived_column", "expected"),
2715
[

tests/test_pipeline/pipeline_helpers.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ def dodgy_planet_data_after_file_transformation() -> Iterator[Tuple[SubmissionIn
152152
"numberOfMoons": "-1",
153153
"hasRingSystem": "false",
154154
"hasGlobalMagneticField": "sometimes",
155+
"__record_index__": "1"
155156
}
156157
planet_contract_df = pl.DataFrame(
157158
planet_contract_data, {k: pl.Utf8() for k in planet_contract_data}
@@ -381,7 +382,8 @@ def error_data_after_business_rules() -> Iterator[Tuple[SubmissionInfo, str]]:
381382
"ErrorCode": "LONG_ORBIT",
382383
"ReportingField": "orbitalPeriod",
383384
"Value": "365.20001220703125",
384-
"Category": "Bad value"
385+
"Category": "Bad value",
386+
"RecordIndex": "1"
385387
},
386388
{
387389
"Entity": "planets",
@@ -394,7 +396,8 @@ def error_data_after_business_rules() -> Iterator[Tuple[SubmissionInfo, str]]:
394396
"ErrorCode": "STRONG_GRAVITY",
395397
"ReportingField": "gravity",
396398
"Value": "9.800000190734863",
397-
"Category": "Bad value"
399+
"Category": "Bad value",
400+
"RecordIndex": "1"
398401
}
399402
]"""
400403
)

tests/test_pipeline/test_spark_pipeline.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name
175175
"ErrorMessage": "is invalid",
176176
"ErrorCode": "BadValue",
177177
"ReportingField": "planet",
178+
"RecordIndex": "1",
178179
"Value": "EarthEarthEarthEarthEarthEarthEarthEarthEarth",
179180
"Category": "Bad value",
180181
},
@@ -188,6 +189,7 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name
188189
"ErrorMessage": "is invalid",
189190
"ErrorCode": "BadValue",
190191
"ReportingField": "numberOfMoons",
192+
"RecordIndex": "1",
191193
"Value": "-1",
192194
"Category": "Bad value",
193195
},
@@ -201,6 +203,7 @@ def test_apply_data_contract_failed( # pylint: disable=redefined-outer-name
201203
"ErrorMessage": "is invalid",
202204
"ErrorCode": "BadValue",
203205
"ReportingField": "hasGlobalMagneticField",
206+
"RecordIndex": "1",
204207
"Value": "sometimes",
205208
"Category": "Bad value",
206209
},
@@ -347,6 +350,7 @@ def test_apply_business_rules_with_data_errors( # pylint: disable=redefined-out
347350
"ReportingField": "orbitalPeriod",
348351
"Value": "365.20001220703125",
349352
"Category": "Bad value",
353+
"RecordIndex": "1"
350354
},
351355
{
352356
"Entity": "planets",
@@ -360,6 +364,7 @@ def test_apply_business_rules_with_data_errors( # pylint: disable=redefined-out
360364
"ReportingField": "gravity",
361365
"Value": "9.800000190734863",
362366
"Category": "Bad value",
367+
"RecordIndex": "1"
363368
},
364369
]
365370

@@ -504,6 +509,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out
504509
"Error Code": "LONG_ORBIT",
505510
"Data Item Submission Name": "orbitalPeriod",
506511
"Errors and Warnings": "Planet has long orbital period",
512+
"Record Index": 1,
507513
"Value": 365.20001220703125,
508514
"ID": None,
509515
"Category": "Bad value",
@@ -516,6 +522,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out
516522
"Error Code": "STRONG_GRAVITY",
517523
"Data Item Submission Name": "gravity",
518524
"Errors and Warnings": "Planet has too strong gravity",
525+
"Record Index": 1,
519526
"Value": 9.800000190734863,
520527
"ID": None,
521528
"Category": "Bad value",

tests/testdata/movies/movies_ddb_rule_store.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
"name": "Get median sequel rating",
6262
"operation": "group_by",
6363
"entity": "with_sequels",
64-
"group_by": "title",
64+
"group_by": ["__record_index__", "title"],
6565
"agg_columns": {
6666
"list_aggregate(sequel_rating, 'median')": "median_sequel_rating"
6767
}

0 commit comments

Comments
 (0)