2020# TODO: Update script to for Sample Processing - has_input for MassSpectrometry will have to be changed to be a processed sample id - not biosample id
2121# TODO: og_url in ApiInfoGetter in metadata_gen_supplement.py to be regular url once Berkeley is integrated
2222# TODO: uncomment and comment testing lines (modules), ms, issue, etc. see below in run
23- # TODO: remove break statement
23+
2424
2525class MetadataGenerator :
2626
@@ -75,6 +75,8 @@ def __init__(self, metadata_file: str, data_dir: str, ref_calibration_path: str,
7575 "spectrometry data." )
7676 self .workflow_git_url = "https://github.com/microbiomedata/enviroMS"
7777 self .processed_data_category = "processed_data"
78+ self .workflow_param_data_category = "workflow_parameter_data"
79+ self .workflow_param_data_object_type = "Configuration toml"
7880
7981 def run (self ):
8082 """
@@ -83,16 +85,9 @@ def run(self):
8385 This method processes the metadata file, generates biosamples (if needed)
8486 and metadata, and manages the workflow for generating NOM analysis data.
8587 """
86- file_ext = '.d'
87-
88- raw_dir_zip = self .data_dir / Path ("raw_zip/" )
89- raw_dir_zip .mkdir (parents = True , exist_ok = True )
9088
91- results_dir = self .data_dir / Path ("results/" )
92- results_dir .mkdir (parents = True , exist_ok = True )
93-
94- registration_dir = self .data_dir / 'registration'
95- registration_dir .mkdir (parents = True , exist_ok = True )
89+ file_ext = '.d'
90+ raw_dir_zip , results_dir , registration_dir = self .setup_directories ()
9691 registration_file = registration_dir / self .database_dump_json_path
9792
9893 # Dictionary to track failures
@@ -115,19 +110,10 @@ def run(self):
115110 for index , row in tqdm (metadata_df .iterrows (), total = metadata_df .shape [0 ], desc = "\033 [95mProcessing rows\033 [0m" ):
116111 # Do not generate biosamples if biosample_id exists in spreadsheet
117112 try :
118- if parser .check_for_biosamples (row ):
119- emsl_metadata = parser .parse_no_biosample_metadata (row )
120- biosample_id = emsl_metadata .biosample_id
121- tqdm .write (f"Biosample already exists for { emsl_metadata .data_path } , will not generate Biosample..." )
122- else :
123- # Generate biosamples if no biosample_id in spreadsheet
124- emsl_metadata = parser .parse_biosample_metadata (row )
125- biosample = self .generate_biosample (biosamp_metadata = emsl_metadata )
126-
127- # Append biosample instance to nmdc_database
128- nmdc_database .biosample_set .append (biosample )
129- biosample_id = biosample .id
130- tqdm .write (f"Generating Biosamples for { emsl_metadata .data_path } " )
113+
114+ # Check if biosample_id is in metadata_csv. If no biosample_id, then will generate biosamples,
115+ # if biosample_id exists, will return None for biosample.
116+ emsl_metadata , biosample_id , biosample = self .handle_biosample (parser , row )
131117
132118 # Create raw_file_path
133119 raw_file_path = self .data_dir / emsl_metadata .data_path .with_suffix (file_ext )
@@ -136,66 +122,133 @@ def run(self):
136122 # issue, ms = run_nmdc_workflow((raw_file_path, self.ref_calibration_path, self.field_strength))
137123 # Remove these lines before pushing and uncomment above and uncomment module import. Uncomment ms.to_csv line as well.
138124 issue = None
139- ms = True
140-
141- try :
142- if ms :
143- if raw_file_path .suffix == '.d' :
144- raw_file_to_upload_path = Path (raw_dir_zip / raw_file_path .stem )
145- # Create a zip file
146- shutil .make_archive (raw_file_to_upload_path , 'zip' , raw_file_path )
147- else :
148- raw_file_to_upload_path = raw_file_path
149-
150- result_file_name = Path (raw_file_path .name )
151- output_file_path = results_dir / result_file_name .with_suffix ('.csv' )
152- # ms.to_csv(output_file_path, write_metadata=False)
153-
154- self .create_nmdc_metadata (raw_data_path = raw_file_to_upload_path .with_suffix ('.zip' ),
155- data_product_path = output_file_path ,
156- emsl_metadata = emsl_metadata ,
157- biosample_id = biosample_id ,
158- nom_metadata_db = nmdc_database )
159- else :
160- tqdm .write (f"Workflow issue for { raw_file_path } : { issue } " )
161- failed_metadata ['processing_errors' ].append ({
162- 'row_index' : index + 2 ,
163- 'filename' : str (raw_file_path ),
164- 'error' : f"Workflow issue: { issue } "
165- })
166-
167- except Exception as inst :
168- tqdm .write (f"Error processing { raw_file_path } : { str (inst )} " )
169- failed_metadata ['processing_errors' ].append ({
170- 'row_index' : index + 2 ,
171- 'filename' : str (raw_file_path ),
172- 'error' : str (inst )
173- })
174- continue
125+ class MockMS :
126+ def __init__ (self ):
127+ self .data = "test"
128+
129+ def to_csv (self , path , write_metadata = False ):
130+ # Just pass for testing
131+ pass
132+ ms = MockMS ()
133+
134+ if ms :
135+
136+ # Process data files
137+ raw_file_to_upload_path , output_file_path , toml_file_path = self .process_data_files (
138+ ms , raw_file_path , raw_dir_zip , results_dir
139+ )
140+
141+ # Generate NMDC metadata
142+ self .create_nmdc_metadata (raw_data_path = raw_file_to_upload_path .with_suffix ('.zip' ),
143+ data_product_path = output_file_path ,
144+ emsl_metadata = emsl_metadata ,
145+ biosample_id = biosample_id ,
146+ toml_workflow_param_path = toml_file_path ,
147+ nom_metadata_db = nmdc_database )
148+
149+ # Add biosample to database if it was newly generated
150+ if biosample :
151+ nmdc_database .biosample_set .append (biosample )
152+
153+ else :
154+ self .record_processing_error (failed_metadata , index , raw_file_path , f"Workflow issue: { issue } " )
175155
176156 except Exception as e :
177157 # Record the failed row with its error
178- failed_metadata [ 'processing_errors' ]. append ({
179- 'row_index' : index + 2 ,
180- 'filename' : row . get ( 'LC-MS filename' , 'Unknown' ) ,
181- 'error' : str ( e )
182- } )
183- tqdm . write ( f"Error processing row { index + 2 } : { str ( e ) } " )
158+ self . record_processing_error (
159+ failed_metadata ,
160+ index ,
161+ row . get ( 'LC-MS filename' , 'Unknown' ),
162+ str ( e )
163+ )
184164 continue
185165
186166 # At the end of processing, save the failed metadata if there are any errors
167+ self .save_error_log (failed_metadata , results_dir )
168+
169+ self .dump_nmdc_database (nmdc_database , registration_file )
170+
171+ tqdm .write ("\033 [92mMetadata processing completed.\033 [0m" )
172+
173+ def setup_directories (self ) -> tuple [Path , Path , Path ]:
174+ "Create and return necessary directory paths"
175+
176+ raw_dir_zip = self .data_dir / Path ("raw_zip/" )
177+ raw_dir_zip .mkdir (parents = True , exist_ok = True )
178+
179+ results_dir = self .data_dir / Path ("results/" )
180+ results_dir .mkdir (parents = True , exist_ok = True )
181+
182+ registration_dir = self .data_dir / 'registration'
183+ registration_dir .mkdir (parents = True , exist_ok = True )
184+
185+ return raw_dir_zip , results_dir , registration_dir
186+
187+ def handle_biosample (self , parser : MetadataParser , row : pd .Series ) -> tuple :
188+ """Process biosample data from row"""
189+
190+ if parser .check_for_biosamples (row ):
191+ emsl_metadata = parser .parse_no_biosample_metadata (row )
192+ biosample_id = emsl_metadata .biosample_id
193+ tqdm .write (f"Biosample already exists for { emsl_metadata .data_path } , will not generate Biosample..." )
194+ return emsl_metadata , biosample_id , None
195+ else :
196+ # Generate biosamples if no biosample_id in spreadsheet
197+ emsl_metadata = parser .parse_biosample_metadata (row )
198+ biosample = self .generate_biosample (biosamp_metadata = emsl_metadata )
199+ biosample_id = biosample .id
200+ tqdm .write (f"Generating Biosamples for { emsl_metadata .data_path } " )
201+ return emsl_metadata , biosample_id , biosample
202+
203+ def process_data_files (self , ms , raw_file_path : Path , raw_dir_zip : Path , results_dir : Path ) -> tuple [Path , Path , Path ]:
204+ #TODO uncomment necessary lines
205+ """Process and prepare data files"""
206+
207+ if raw_file_path .suffix == '.d' :
208+ raw_file_to_upload_path = Path (raw_dir_zip / raw_file_path .stem )
209+ # Create a zip file
210+ shutil .make_archive (raw_file_to_upload_path , 'zip' , raw_file_path )
211+ else :
212+ raw_file_to_upload_path = raw_file_path
213+
214+ result_file_name = Path (raw_file_path .name )
215+ output_file_path = results_dir / result_file_name .with_suffix ('.csv' )
216+ ms .to_csv (output_file_path , write_metadata = True )
217+ #to_csv will save two files two disc (csv and same path and file name but toml file) add this to the registry
218+ # switched write_metadata=True to be able to save the toml file which needs to be saved.
219+
220+ # Get workflow parameter toml path
221+ toml_file_path = output_file_path .with_suffix ('.toml' )
222+
223+ return raw_file_to_upload_path , output_file_path , toml_file_path
224+
225+ def record_processing_error (self , failed_metadata : dict , index : int , filename : str , error : str ) -> None :
226+ """Record a processing error in the failed_metadata dictionary and log it.
227+
228+ Args:
229+ failed_metadata: Dictionary tracking processing errors
230+ index: Row index from the dataframe
231+ filename: Name of the file being processed
232+ error: Error message or description
233+ """
234+ failed_metadata ['processing_errors' ].append ({
235+ 'row_index' : index + 2 ,
236+ 'filename' : str (filename ),
237+ 'error' : str (error )
238+ })
239+ tqdm .write (f"Error processing row { index + 2 } : { str (error )} " )
240+
241+ def save_error_log (self , failed_metadata : dict , results_dir : Path ) -> None :
242+ """Save error log if there are any errors."""
187243 if any (failed_metadata .values ()):
188244 error_file = results_dir / 'failed_metadata_rows.json'
189245 with open (error_file , 'w' ) as f :
190246 json .dump (failed_metadata , f , indent = 2 )
191247 tqdm .write (f"\n \033 [91mSome rows failed processing. See { error_file } for details.\033 [0m" )
192248
193- self .dump_nmdc_database (nmdc_database , registration_file )
194-
195- tqdm .write ("\033 [92mMetadata processing completed.\033 [0m" )
196-
197249 def create_nmdc_metadata (self , raw_data_path : Path , data_product_path : Path ,
198250 emsl_metadata : object , biosample_id : str ,
251+ toml_workflow_param_path : Path ,
199252 nom_metadata_db : nmdc .Database ):
200253 """
201254 Create NMDC metadata entries.
@@ -210,6 +263,8 @@ def create_nmdc_metadata(self, raw_data_path: Path, data_product_path: Path,
210263 The EMSL metadata object containing information about the sample.
211264 biosample_id : str
212265 The ID of the biosample.
266+ toml_workflow_param_path: Path
267+ The path to the workflow parameter metadata toml file.
213268 nom_metadata_db : nmdc.Database
214269 The database instance to store the generated metadata.
215270 """
@@ -242,6 +297,15 @@ def create_nmdc_metadata(self, raw_data_path: Path, data_product_path: Path,
242297 data_object_type = self .processed_data_object_type ,
243298 description = processed_data_object_desc ,
244299 was_generated_by = nom_analysis .id )
300+
301+ # Generate workflow parameter data object
302+ workflow_param_data_object_desc = (f"CoreMS processing parameters for natural organic matter analysis "
303+ "used to generate {processed_data_object.id}" )
304+ parameter_data_object = self .generate_data_object (file_path = toml_workflow_param_path ,
305+ data_category = self .workflow_param_data_category ,
306+ data_object_type = self .workflow_param_data_object_type ,
307+ description = workflow_param_data_object_desc )
308+
245309
246310 # Update the outputs for mass_spectrometry and nom_analysis
247311 self .update_outputs (mass_spec_obj = mass_spectrometry ,
@@ -254,6 +318,7 @@ def create_nmdc_metadata(self, raw_data_path: Path, data_product_path: Path,
254318 nom_metadata_db .workflow_execution_set .append (nom_analysis )
255319 nom_metadata_db .data_generation_set .append (mass_spectrometry )
256320 nom_metadata_db .data_object_set .append (processed_data_object )
321+ nom_metadata_db .data_object_set .append (parameter_data_object )
257322
258323 def mint_nmdc_id (self , nmdc_type : str ) -> list [str ]:
259324 """
0 commit comments