|
23 | 23 | # SOFTWARE. |
24 | 24 | # |
25 | 25 | ############################################################################### |
26 | | -from typing import Generic |
| 26 | +import json |
| 27 | +import os |
| 28 | +from pathlib import Path |
| 29 | +from typing import Any, Generic, Optional |
27 | 30 |
|
28 | 31 | from nodescraper.connection.inband import InBandConnectionManager, SSHConnectionParams |
29 | 32 | from nodescraper.generictypes import TAnalyzeArg, TCollectArg, TDataModel |
30 | 33 | from nodescraper.interfaces import DataPlugin |
| 34 | +from nodescraper.models import DataModel |
| 35 | +from nodescraper.utils import pascal_to_snake |
31 | 36 |
|
32 | 37 |
|
33 | 38 | class InBandDataPlugin( |
34 | 39 | DataPlugin[InBandConnectionManager, SSHConnectionParams, TDataModel, TCollectArg, TAnalyzeArg], |
35 | 40 | Generic[TDataModel, TCollectArg, TAnalyzeArg], |
36 | 41 | ): |
37 | | - """Base class for in band plugins""" |
| 42 | + """Base class for in band plugins.""" |
38 | 43 |
|
39 | 44 | CONNECTION_TYPE = InBandConnectionManager |
| 45 | + |
| 46 | + @classmethod |
| 47 | + def find_datamodel_path_in_run(cls, run_path: str) -> Optional[str]: |
| 48 | + """Find this plugin's collector datamodel file under a scraper run directory. |
| 49 | +
|
| 50 | + Args: |
| 51 | + run_path: Path to a scraper log run directory (e.g. scraper_logs_*). |
| 52 | +
|
| 53 | + Returns: |
| 54 | + Absolute path to the datamodel file, or None if not found. |
| 55 | + """ |
| 56 | + run_path = os.path.abspath(run_path) |
| 57 | + if not os.path.isdir(run_path): |
| 58 | + return None |
| 59 | + collector_cls = getattr(cls, "COLLECTOR", None) |
| 60 | + data_model_cls = getattr(cls, "DATA_MODEL", None) |
| 61 | + if not collector_cls or not data_model_cls: |
| 62 | + return None |
| 63 | + collector_dir = os.path.join( |
| 64 | + run_path, |
| 65 | + pascal_to_snake(cls.__name__), |
| 66 | + pascal_to_snake(collector_cls.__name__), |
| 67 | + ) |
| 68 | + if not os.path.isdir(collector_dir): |
| 69 | + return None |
| 70 | + result_path = os.path.join(collector_dir, "result.json") |
| 71 | + if not os.path.isfile(result_path): |
| 72 | + return None |
| 73 | + try: |
| 74 | + res_payload = json.loads(Path(result_path).read_text(encoding="utf-8")) |
| 75 | + if res_payload.get("parent") != cls.__name__: |
| 76 | + return None |
| 77 | + except (json.JSONDecodeError, OSError): |
| 78 | + return None |
| 79 | + want_json = data_model_cls.__name__.lower() + ".json" |
| 80 | + for fname in os.listdir(collector_dir): |
| 81 | + low = fname.lower() |
| 82 | + if low.endswith("datamodel.json") or low == want_json: |
| 83 | + return os.path.join(collector_dir, fname) |
| 84 | + if low.endswith(".log"): |
| 85 | + return os.path.join(collector_dir, fname) |
| 86 | + return None |
| 87 | + |
| 88 | + @classmethod |
| 89 | + def load_datamodel_from_path(cls, dm_path: str) -> Optional[TDataModel]: |
| 90 | + """Load this plugin's DATA_MODEL from a file path (JSON or .log). |
| 91 | +
|
| 92 | + Args: |
| 93 | + dm_path: Path to datamodel JSON or to a .log file (if DATA_MODEL |
| 94 | + implements import_model for that format). |
| 95 | +
|
| 96 | + Returns: |
| 97 | + Instance of DATA_MODEL or None if load fails. |
| 98 | + """ |
| 99 | + dm_path = os.path.abspath(dm_path) |
| 100 | + if not os.path.isfile(dm_path): |
| 101 | + return None |
| 102 | + data_model_cls = getattr(cls, "DATA_MODEL", None) |
| 103 | + if not data_model_cls: |
| 104 | + return None |
| 105 | + try: |
| 106 | + if dm_path.lower().endswith(".log"): |
| 107 | + import_model = getattr(data_model_cls, "import_model", None) |
| 108 | + if not callable(import_model): |
| 109 | + return None |
| 110 | + base_import = getattr(DataModel.import_model, "__func__", DataModel.import_model) |
| 111 | + if getattr(import_model, "__func__", import_model) is base_import: |
| 112 | + return None |
| 113 | + return import_model(dm_path) |
| 114 | + with open(dm_path, encoding="utf-8") as f: |
| 115 | + data = json.load(f) |
| 116 | + return data_model_cls.model_validate(data) |
| 117 | + except (json.JSONDecodeError, OSError, Exception): |
| 118 | + return None |
| 119 | + |
| 120 | + @classmethod |
| 121 | + def get_extracted_errors(cls, data_model: DataModel) -> Optional[list[str]]: |
| 122 | + """Compute extracted errors from datamodel for compare-runs (in memory only). |
| 123 | +
|
| 124 | + Args: |
| 125 | + data_model: Loaded DATA_MODEL instance. |
| 126 | +
|
| 127 | + Returns: |
| 128 | + Sorted list of error match strings, or None if not applicable. |
| 129 | + """ |
| 130 | + get_content = getattr(data_model, "get_compare_content", None) |
| 131 | + if not callable(get_content): |
| 132 | + return None |
| 133 | + try: |
| 134 | + content = get_content() |
| 135 | + except Exception: |
| 136 | + return None |
| 137 | + if not isinstance(content, str): |
| 138 | + return None |
| 139 | + analyzer_cls = getattr(cls, "ANALYZER", None) |
| 140 | + if not analyzer_cls: |
| 141 | + return None |
| 142 | + get_matches = getattr(analyzer_cls, "get_error_matches", None) |
| 143 | + if not callable(get_matches): |
| 144 | + return None |
| 145 | + try: |
| 146 | + matches = get_matches(content) |
| 147 | + return sorted(matches) if matches is not None else None |
| 148 | + except Exception: |
| 149 | + return None |
| 150 | + |
| 151 | + @classmethod |
| 152 | + def load_run_data(cls, run_path: str) -> Optional[dict[str, Any]]: |
| 153 | + """Load this plugin's run data from a scraper run directory for comparison. |
| 154 | +
|
| 155 | + Args: |
| 156 | + run_path: Path to a scraper log run directory or to a datamodel file. |
| 157 | +
|
| 158 | + Returns: |
| 159 | + Dict suitable for diffing with another run, or None if not found. |
| 160 | + """ |
| 161 | + run_path = os.path.abspath(run_path) |
| 162 | + if not os.path.exists(run_path): |
| 163 | + return None |
| 164 | + dm_path = run_path if os.path.isfile(run_path) else cls.find_datamodel_path_in_run(run_path) |
| 165 | + if not dm_path: |
| 166 | + return None |
| 167 | + data_model = cls.load_datamodel_from_path(dm_path) |
| 168 | + if data_model is None: |
| 169 | + return None |
| 170 | + out = data_model.model_dump(mode="json") |
| 171 | + extracted = cls.get_extracted_errors(data_model) |
| 172 | + if extracted is not None: |
| 173 | + out["extracted_errors"] = extracted |
| 174 | + return out |
0 commit comments