Skip to content

Commit 91a16c9

Browse files
committed
Separate the bulk of the '_analyse' logic from the while loop it is in; '_analyse' is now run in the while loop from '_analyse_in_thread'; grouped SPA/tomo-specific Analyser attributes in a separate block
1 parent 3f56224 commit 91a16c9

1 file changed

Lines changed: 168 additions & 170 deletions

File tree

src/murfey/client/analyser.py

Lines changed: 168 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,14 @@ def __init__(
5656
):
5757
super().__init__()
5858
self._basepath = basepath_local.absolute()
59+
self._token = token
60+
self._environment = environment
5961
self._limited = limited
6062
self._experiment_type = ""
6163
self._acquisition_software = ""
62-
self._extension: str = ""
63-
self._unseen_xml: list = []
6464
self._context: Context | None = None
65-
self._batch_store: dict = {}
66-
self._environment = environment
67-
self._force_mdoc_metadata = force_mdoc_metadata
68-
self._token = token
69-
self._serialem = serialem
70-
self.parameters_model: (
71-
Type[ProcessingParametersSPA] | Type[ProcessingParametersTomo] | None
72-
) = None
73-
7465
self.queue: queue.Queue = queue.Queue()
75-
self.thread = threading.Thread(name="Analyser", target=self._analyse)
66+
self.thread = threading.Thread(name="Analyser", target=self._analyse_in_thread)
7667
self._stopping = False
7768
self._halt_thread = False
7869
self._murfey_config = (
@@ -85,6 +76,17 @@ def __init__(
8576
else {}
8677
)
8778

79+
# SPA & Tomo-specific attributes
80+
self._extension: str = ""
81+
self._unseen_xml: list = []
82+
self._batch_store: dict = {}
83+
self._force_mdoc_metadata = force_mdoc_metadata
84+
self._mdoc_for_reading: Path | None = None
85+
self._serialem = serialem
86+
self.parameters_model: (
87+
Type[ProcessingParametersSPA] | Type[ProcessingParametersTomo] | None
88+
) = None
89+
8890
def __repr__(self) -> str:
8991
return f"<Analyser ({self._basepath})>"
9092

@@ -334,9 +336,8 @@ def post_transfer(self, transferred_file: Path):
334336
f"An exception was encountered post transfer: {e}", exc_info=True
335337
)
336338

337-
def _analyse(self):
339+
def _analyse_in_thread(self):
338340
logger.info("Analyser thread started")
339-
mdoc_for_reading = None
340341
while not self._halt_thread:
341342
transferred_file = self.queue.get()
342343
transferred_file = (
@@ -347,185 +348,182 @@ def _analyse(self):
347348
if not transferred_file:
348349
self._halt_thread = True
349350
continue
350-
if self._limited:
351-
if (
352-
"Metadata" in transferred_file.parts
353-
or transferred_file.name == "EpuSession.dm"
354-
and not self._context
355-
):
356-
if not (context := _get_context("SPAMetadataContext")):
357-
continue
358-
self._context = context.load()(
359-
"epu",
360-
self._basepath,
361-
self._murfey_config,
362-
self._token,
363-
)
364-
elif (
365-
"Batch" in transferred_file.parts
366-
or "SearchMaps" in transferred_file.parts
367-
or transferred_file.name == "Session.dm"
368-
and not self._context
369-
):
370-
if not (context := _get_context("TomographyMetadataContext")):
371-
continue
372-
self._context = context.load()(
373-
"tomo",
374-
self._basepath,
375-
self._murfey_config,
376-
self._token,
377-
)
378-
self.post_transfer(transferred_file)
379-
else:
380-
dc_metadata = {}
381-
if not self._serialem and (
382-
self._force_mdoc_metadata
383-
and transferred_file.suffix == ".mdoc"
384-
or mdoc_for_reading
385-
):
386-
if self._context:
387-
try:
388-
dc_metadata = self._context.gather_metadata(
389-
mdoc_for_reading or transferred_file,
390-
environment=self._environment,
391-
)
392-
except KeyError as e:
393-
logger.error(
394-
f"Metadata gathering failed with a key error for key: {e.args[0]}"
395-
)
396-
raise e
397-
if not dc_metadata:
398-
mdoc_for_reading = None
399-
elif transferred_file.suffix == ".mdoc":
400-
mdoc_for_reading = transferred_file
401-
if not self._context:
402-
if not self._find_extension(transferred_file):
403-
logger.debug(f"No extension found for {transferred_file}")
404-
continue
405-
if not self._find_context(transferred_file):
406-
logger.debug(
407-
f"Couldn't find context for {str(transferred_file)!r}"
351+
self._analyse(transferred_file)
352+
self.queue.task_done()
353+
logger.debug("Analyer thread has stopped analysing incoming files")
354+
self.notify(final=True)
355+
356+
def _analyse(self, transferred_file: Path):
357+
if self._limited:
358+
if (
359+
"Metadata" in transferred_file.parts
360+
or transferred_file.name == "EpuSession.dm"
361+
and not self._context
362+
):
363+
if not (context := _get_context("SPAMetadataContext")):
364+
return
365+
self._context = context.load()(
366+
"epu",
367+
self._basepath,
368+
self._murfey_config,
369+
self._token,
370+
)
371+
elif (
372+
"Batch" in transferred_file.parts
373+
or "SearchMaps" in transferred_file.parts
374+
or transferred_file.name == "Session.dm"
375+
and not self._context
376+
):
377+
if not (context := _get_context("TomographyMetadataContext")):
378+
return
379+
self._context = context.load()(
380+
"tomo",
381+
self._basepath,
382+
self._murfey_config,
383+
self._token,
384+
)
385+
self.post_transfer(transferred_file)
386+
else:
387+
dc_metadata = {}
388+
if not self._serialem and (
389+
self._force_mdoc_metadata
390+
and transferred_file.suffix == ".mdoc"
391+
or self._mdoc_for_reading
392+
):
393+
if self._context:
394+
try:
395+
dc_metadata = self._context.gather_metadata(
396+
self._mdoc_for_reading or transferred_file,
397+
environment=self._environment,
408398
)
409-
self.queue.task_done()
410-
continue
411-
elif self._extension:
412-
logger.info(
413-
f"Context found successfully for {transferred_file}"
399+
except KeyError as e:
400+
logger.error(
401+
f"Metadata gathering failed with a key error for key: {e.args[0]}"
414402
)
415-
try:
403+
raise e
404+
if not dc_metadata:
405+
self._mdoc_for_reading = None
406+
elif transferred_file.suffix == ".mdoc":
407+
self._mdoc_for_reading = transferred_file
408+
if not self._context:
409+
if not self._find_extension(transferred_file):
410+
logger.debug(f"No extension found for {transferred_file}")
411+
return
412+
if not self._find_context(transferred_file):
413+
logger.debug(f"Couldn't find context for {str(transferred_file)!r}")
414+
return
415+
elif self._extension:
416+
logger.info(f"Context found successfully for {transferred_file}")
417+
try:
418+
if self._context is not None:
416419
self._context.post_first_transfer(
417420
transferred_file,
418421
environment=self._environment,
419422
)
420-
except Exception as e:
421-
logger.error(f"Exception encountered: {e}")
422-
if "AtlasContext" not in str(self._context):
423-
if not dc_metadata:
424-
try:
423+
except Exception as e:
424+
logger.error(f"Exception encountered: {e}")
425+
if "AtlasContext" not in str(self._context):
426+
if not dc_metadata:
427+
try:
428+
if self._context is not None:
425429
dc_metadata = self._context.gather_metadata(
426430
self._xml_file(transferred_file),
427431
environment=self._environment,
428432
)
429-
except NotImplementedError:
430-
dc_metadata = {}
431-
except KeyError as e:
432-
logger.error(
433-
f"Metadata gathering failed with a key error for key: {e.args[0]}"
434-
)
435-
raise e
436-
except ValueError as e:
437-
logger.error(
438-
f"Metadata gathering failed with a value error: {e}"
439-
)
440-
if not dc_metadata or not self._force_mdoc_metadata:
441-
self._unseen_xml.append(transferred_file)
442-
else:
443-
self._unseen_xml = []
444-
if dc_metadata.get("file_extension"):
445-
self._extension = dc_metadata["file_extension"]
446-
else:
447-
dc_metadata["file_extension"] = self._extension
448-
dc_metadata["acquisition_software"] = (
449-
self._context._acquisition_software
450-
)
451-
self.notify(dc_metadata)
452-
453-
# Contexts that can be immediately posted without additional work
454-
elif "CLEMContext" in str(self._context):
455-
logger.debug(
456-
f"File {transferred_file.name!r} is part of CLEM workflow"
457-
)
458-
self.post_transfer(transferred_file)
459-
elif "FIBContext" in str(self._context):
460-
logger.debug(
461-
f"File {transferred_file.name!r} is part of the FIB workflow"
462-
)
463-
self.post_transfer(transferred_file)
464-
elif "SXTContext" in str(self._context):
465-
logger.debug(f"File {transferred_file.name!r} is an SXT file")
466-
self.post_transfer(transferred_file)
467-
elif "AtlasContext" in str(self._context):
468-
logger.debug(f"File {transferred_file.name!r} is part of the atlas")
469-
self.post_transfer(transferred_file)
470-
471-
# Handle files with tomography and SPA context differently
472-
elif not self._extension or self._unseen_xml:
473-
if not self._find_extension(transferred_file):
474-
logger.error(f"No extension found for {transferred_file}")
475-
continue
476-
if self._extension:
477-
logger.info(
478-
f"Extension found successfully for {transferred_file}"
479-
)
480-
try:
481-
self._context.post_first_transfer(
482-
transferred_file,
483-
environment=self._environment,
484-
)
485-
except Exception as e:
486-
logger.error(f"Exception encountered: {e}")
487-
if not dc_metadata:
488-
try:
489-
dc_metadata = self._context.gather_metadata(
490-
mdoc_for_reading
491-
or self._xml_file(transferred_file),
492-
environment=self._environment,
493-
)
433+
except NotImplementedError:
434+
dc_metadata = {}
494435
except KeyError as e:
495436
logger.error(
496437
f"Metadata gathering failed with a key error for key: {e.args[0]}"
497438
)
498439
raise e
440+
except ValueError as e:
441+
logger.error(
442+
f"Metadata gathering failed with a value error: {e}"
443+
)
499444
if not dc_metadata or not self._force_mdoc_metadata:
500-
mdoc_for_reading = None
501445
self._unseen_xml.append(transferred_file)
502-
if dc_metadata:
446+
else:
503447
self._unseen_xml = []
504448
if dc_metadata.get("file_extension"):
505449
self._extension = dc_metadata["file_extension"]
506450
else:
507451
dc_metadata["file_extension"] = self._extension
508-
dc_metadata["acquisition_software"] = (
509-
self._context._acquisition_software
452+
if self._context is not None:
453+
dc_metadata["acquisition_software"] = (
454+
self._context._acquisition_software
455+
)
456+
self.notify(dc_metadata)
457+
458+
# Contexts that can be immediately posted without additional work
459+
elif "CLEMContext" in str(self._context):
460+
logger.debug(f"File {transferred_file.name!r} is part of CLEM workflow")
461+
self.post_transfer(transferred_file)
462+
elif "FIBContext" in str(self._context):
463+
logger.debug(
464+
f"File {transferred_file.name!r} is part of the FIB workflow"
465+
)
466+
self.post_transfer(transferred_file)
467+
elif "SXTContext" in str(self._context):
468+
logger.debug(f"File {transferred_file.name!r} is an SXT file")
469+
self.post_transfer(transferred_file)
470+
elif "AtlasContext" in str(self._context):
471+
logger.debug(f"File {transferred_file.name!r} is part of the atlas")
472+
self.post_transfer(transferred_file)
473+
474+
# Handle files with tomography and SPA context differently
475+
elif not self._extension or self._unseen_xml:
476+
if not self._find_extension(transferred_file):
477+
logger.error(f"No extension found for {transferred_file}")
478+
return
479+
if self._extension:
480+
logger.info(f"Extension found successfully for {transferred_file}")
481+
try:
482+
self._context.post_first_transfer(
483+
transferred_file,
484+
environment=self._environment,
485+
)
486+
except Exception as e:
487+
logger.error(f"Exception encountered: {e}")
488+
if not dc_metadata:
489+
try:
490+
dc_metadata = self._context.gather_metadata(
491+
self._mdoc_for_reading
492+
or self._xml_file(transferred_file),
493+
environment=self._environment,
510494
)
511-
self.notify(dc_metadata)
512-
elif any(
513-
context in str(self._context)
514-
for context in (
515-
"SPAContext",
516-
"SPAMetadataContext",
517-
"TomographyContext",
518-
"TomographyMetadataContext",
519-
)
520-
):
521-
context = str(self._context).split(" ")[0].split(".")[-1]
522-
logger.debug(
523-
f"Transferring file {str(transferred_file)} with context {context!r}"
524-
)
525-
self.post_transfer(transferred_file)
526-
self.queue.task_done()
527-
logger.debug("Analyer thread has stopped analysing incoming files")
528-
self.notify(final=True)
495+
except KeyError as e:
496+
logger.error(
497+
f"Metadata gathering failed with a key error for key: {e.args[0]}"
498+
)
499+
raise e
500+
if not dc_metadata or not self._force_mdoc_metadata:
501+
self._mdoc_for_reading = None
502+
self._unseen_xml.append(transferred_file)
503+
if dc_metadata:
504+
self._unseen_xml = []
505+
if dc_metadata.get("file_extension"):
506+
self._extension = dc_metadata["file_extension"]
507+
else:
508+
dc_metadata["file_extension"] = self._extension
509+
dc_metadata["acquisition_software"] = (
510+
self._context._acquisition_software
511+
)
512+
self.notify(dc_metadata)
513+
elif any(
514+
context in str(self._context)
515+
for context in (
516+
"SPAContext",
517+
"SPAMetadataContext",
518+
"TomographyContext",
519+
"TomographyMetadataContext",
520+
)
521+
):
522+
context = str(self._context).split(" ")[0].split(".")[-1]
523+
logger.debug(
524+
f"Transferring file {str(transferred_file)} with context {context!r}"
525+
)
526+
self.post_transfer(transferred_file)
529527

530528
def _xml_file(self, data_file: Path) -> Path:
531529
if not self._environment:

0 commit comments

Comments
 (0)