|
16 | 16 | from polars import DataFrame |
17 | 17 | from polars.exceptions import ColumnNotFoundError |
18 | 18 |
|
| 19 | +from dve.pipeline.utils import SubmissionStatus |
| 20 | + |
19 | 21 |
|
20 | 22 | @dataclass |
21 | 23 | class SummaryItems: |
22 | 24 | """Items to go into the Summary sheet""" |
23 | 25 |
|
| 26 | + submission_status: SubmissionStatus = field(default_factory=SubmissionStatus) |
| 27 | + """The status of the submission""" |
24 | 28 | summary_dict: dict[str, Any] = field(default_factory=dict) |
25 | 29 | """Dictionary of items to show in the front sheet key is put into Column B |
26 | 30 | and value in column C""" |
@@ -84,9 +88,12 @@ def create_summary_sheet( |
84 | 88 |
|
85 | 89 | return summary |
86 | 90 |
|
87 | | - @staticmethod |
88 | | - def get_submission_status(aggregates: DataFrame) -> str: |
| 91 | + def get_submission_status(self, aggregates: DataFrame) -> str: |
89 | 92 | """Returns the status of the submission based on the error data""" |
| 93 | + if self.submission_status.processing_failed: |
| 94 | + return "There was an issue processing the submission. This will be investigated." |
| 95 | + if self.submission_status.validation_failed: |
| 96 | + return "File has been rejected" |
90 | 97 | if aggregates.is_empty(): |
91 | 98 | return "File has been accepted, no issues to report" |
92 | 99 | failures = aggregates["Type"].unique() |
@@ -134,149 +141,6 @@ def _add_submission_info(self, status: str, summary: Worksheet): |
134 | 141 | summary.append(["", ""]) |
135 | 142 |
|
136 | 143 |
|
137 | | -@dataclass |
138 | | -class CombinedSummary(SummaryItems): |
139 | | - """Writes the combined report summary tables |
140 | | -
|
141 | | - These get split out of multiple lines based on the partition key of the dataset. |
142 | | -
|
143 | | - Each of these sub tables has rows, with the row being defined by row_field |
144 | | - and columns, with the each one being filtered by column field. |
145 | | -
|
146 | | - An example would look like this... |
147 | | -
|
148 | | - {Current partition} Table heading |
149 | | - partition_key column_field_n column_field_m additional_column_1 addition_column_2 etc. |
150 | | - first_partition 0 2 10 14 |
151 | | - 2nd_partition 3 4 11 15 |
152 | | -
|
153 | | - {next partition} Table heading |
154 | | - partition_key column_field_n column_field_m additional_column_1 addition_column_2 etc. |
155 | | - first_partition 0 5 10 14 |
156 | | - 2nd_partition 3 4 12 15 |
157 | | -
|
158 | | - ...by default the value in the first_partition x column_field_n cell will be the "Count" field |
159 | | - so it's the number of times that a partiticular column has occured within a partition. |
160 | | -
|
161 | | - or more concretly, in a dataset where the columns are `Submission_error` and `warning`, and the |
162 | | - partition key is `file name` - the result would be the number of times a submission error or |
163 | | - warning has occured within a file. |
164 | | -
|
165 | | - In the parent class there is an aggregations property, which allows custom aggregations to |
166 | | - be added. If an aggregation is added to a field not in the column field |
167 | | - (e.g. an additional column) then an aggregation and column mapping needs to be added for it. |
168 | | -
|
169 | | - """ |
170 | | - |
171 | | - column_field: str = "Type" |
172 | | - """Field to display across the top of the table""" |
173 | | - row_field: str = "file_name" |
174 | | - """Field to display along the side of the table""" |
175 | | - partition_key: str = "FeedType" |
176 | | - """Key to split the data into multiple tables""" |
177 | | - table_heading: str = "Files processed" |
178 | | - """Heading for each partitioned table""" |
179 | | - table_mapping: dict = field(default_factory=dict) |
180 | | - """Mapping of a given column to a column in the dataframe, defaults to using Count""" |
181 | | - |
182 | | - def create_summary_sheet( |
183 | | - self, |
184 | | - summary: Worksheet, |
185 | | - aggregates: DataFrame, |
186 | | - status: str, |
187 | | - ): |
188 | | - """Creates a summary sheet for a combined error report""" |
189 | | - self._add_submission_info(status, summary) |
190 | | - |
191 | | - try: |
192 | | - agg_tables = aggregates[self.column_field].unique().to_list() |
193 | | - except ColumnNotFoundError: |
194 | | - agg_tables = [] |
195 | | - tables = self.table_columns or agg_tables |
196 | | - tables = tables.copy() # make sure not to mutate the original |
197 | | - difference = set(agg_tables).difference(tables) |
198 | | - if difference: |
199 | | - tables.extend(difference) |
200 | | - |
201 | | - if self.additional_columns: |
202 | | - tables.extend(self.additional_columns) |
203 | | - |
204 | | - if aggregates.is_empty(): |
205 | | - error_summary = aggregates |
206 | | - else: |
207 | | - groups = [self.column_field, self.row_field, self.partition_key] |
208 | | - |
209 | | - error_summary = ( |
210 | | - # chaining methods on dataframes seems to confuse mypy |
211 | | - aggregates.group_by(groups).agg(*self.aggregations) # type: ignore |
212 | | - ) |
213 | | - tables = [table for table in tables if table is not None] |
214 | | - column = self.partition_key |
215 | | - keys = error_summary[column].unique() |
216 | | - for item in sorted(str(key) for key in keys if key is not None): |
217 | | - summary.append(["", f"{item} {self.table_heading}"]) |
218 | | - self._write_combined_table( |
219 | | - summary, |
220 | | - tables, |
221 | | - error_summary.filter(pl.col(column) == pl.lit(item)), |
222 | | - ) |
223 | | - summary.append([""]) |
224 | | - return summary |
225 | | - |
226 | | - @staticmethod |
227 | | - def get_submission_status(aggregates: DataFrame) -> str: |
228 | | - """Returns the status of the submission based on the error data""" |
229 | | - if aggregates.is_empty(): |
230 | | - return "Overall submission has been accepted, no issues to report" |
231 | | - failures = aggregates["Type"].unique() |
232 | | - if "Submission Failure" in failures: |
233 | | - status = "Submission Failures found, overall submission has been rejected" |
234 | | - elif "Warning" in failures: |
235 | | - status = "Overall submission has been accepted, warnings found" |
236 | | - else: |
237 | | - status = "Overall submission has been accepted, no issues to report" |
238 | | - return status |
239 | | - |
240 | | - def _write_combined_table( |
241 | | - self, |
242 | | - summary: Worksheet, |
243 | | - tables: list[str], |
244 | | - error_summary: DataFrame, |
245 | | - ): |
246 | | - try: |
247 | | - agg_types = error_summary[self.row_field].unique().to_list() |
248 | | - except ColumnNotFoundError: |
249 | | - agg_types = [] |
250 | | - |
251 | | - row_headings = self.row_headings or agg_types |
252 | | - difference = set(row_headings).difference(agg_types) |
253 | | - if difference: |
254 | | - row_headings.extend(difference) |
255 | | - |
256 | | - row_headings = filter(bool, row_headings) |
257 | | - |
258 | | - summary.append(["", self.row_field.capitalize(), *map(str.capitalize, tables)]) |
259 | | - for row_type in sorted(row_headings): |
260 | | - row: list[Any] = ["", row_type] |
261 | | - for table in tables: |
262 | | - count_field = self.table_mapping.get(table, "Count") |
263 | | - if table in self.table_columns: |
264 | | - column_filter = pl.col(self.column_field) == pl.lit(table) |
265 | | - else: |
266 | | - column_filter = True |
267 | | - if error_summary.is_empty(): |
268 | | - counts = error_summary |
269 | | - else: |
270 | | - counts = error_summary.filter( # type: ignore |
271 | | - column_filter & (pl.col(self.row_field) == pl.lit(row_type)) |
272 | | - )[count_field] |
273 | | - if counts.is_empty(): |
274 | | - row.append(0) |
275 | | - else: |
276 | | - row.append(counts[0]) |
277 | | - summary.append(row) |
278 | | - |
279 | | - |
280 | 144 | class ExcelFormat: |
281 | 145 | """Formats error data into an excel file""" |
282 | 146 |
|
|
0 commit comments