|
3 | 3 | import asyncio |
4 | 4 | import datetime |
5 | 5 | import logging |
| 6 | +import urllib |
6 | 7 | from pathlib import Path |
7 | 8 | from typing import Annotated, List, Optional |
8 | 9 |
|
@@ -126,7 +127,7 @@ async def setup_multigrid_watcher( |
126 | 127 | str(k): v for k, v in watcher_spec.destination_overrides.items() |
127 | 128 | }, |
128 | 129 | "rsync_restarts": watcher_spec.rsync_restarts, |
129 | | - "visit_end_time": session.visit_end_time, |
| 130 | + "visit_end_time": str(session.visit_end_time), |
130 | 131 | }, |
131 | 132 | headers={ |
132 | 133 | "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" |
@@ -396,6 +397,36 @@ async def finalise_session(session_id: MurfeySessionID, db=murfey_db): |
396 | 397 | return data |
397 | 398 |
|
398 | 399 |
|
| 400 | +@router.post("/sessions/{session_id}/multigrid_controller/visit_end_time") |
| 401 | +async def update_visit_end_time( |
| 402 | + session_id: MurfeySessionID, end_time: datetime.datetime, db=murfey_db |
| 403 | +): |
| 404 | + # Load data for session |
| 405 | + session_entry = db.exec(select(Session).where(Session.id == session_id)).one() |
| 406 | + instrument_name = session_entry.instrument_name |
| 407 | + |
| 408 | + # Update visit end time in database |
| 409 | + session_entry.visit_end_time = end_time |
| 410 | + db.add(session_entry) |
| 411 | + db.commit() |
| 412 | + |
| 413 | + # Update the multigrid controller |
| 414 | + data = {} |
| 415 | + machine_config = get_machine_config(instrument_name=instrument_name)[ |
| 416 | + instrument_name |
| 417 | + ] |
| 418 | + if machine_config.instrument_server_url: |
| 419 | + async with aiohttp.ClientSession() as clientsession: |
| 420 | + async with clientsession.post( |
| 421 | + f"{machine_config.instrument_server_url}{url_path_for('api.router', 'update_multigrid_controller_visit_end_time', session_id=session_id)}?end_time={urllib.parse.quote(end_time.isoformat())}", |
| 422 | + headers={ |
| 423 | + "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" |
| 424 | + }, |
| 425 | + ) as resp: |
| 426 | + data = await resp.json() |
| 427 | + return data |
| 428 | + |
| 429 | + |
399 | 430 | @router.post("/sessions/{session_id}/abandon_session") |
400 | 431 | async def abandon_session(session_id: MurfeySessionID, db=murfey_db): |
401 | 432 | data = {} |
@@ -473,6 +504,34 @@ async def restart_rsyncer( |
473 | 504 | return data |
474 | 505 |
|
475 | 506 |
|
| 507 | +@router.post("/sessions/{session_id}/flush_skipped_rsyncer") |
| 508 | +async def flush_skipped_rsyncer( |
| 509 | + session_id: MurfeySessionID, rsyncer_source: RsyncerSource, db=murfey_db |
| 510 | +): |
| 511 | + data = {} |
| 512 | + instrument_name = ( |
| 513 | + db.exec(select(Session).where(Session.id == session_id)).one().instrument_name |
| 514 | + ) |
| 515 | + machine_config = get_machine_config(instrument_name=instrument_name)[ |
| 516 | + instrument_name |
| 517 | + ] |
| 518 | + if isinstance(session_id, int): |
| 519 | + if machine_config.instrument_server_url: |
| 520 | + async with aiohttp.ClientSession() as clientsession: |
| 521 | + async with clientsession.post( |
| 522 | + f"{machine_config.instrument_server_url}{url_path_for('api.router', 'flush_skipped_rsyncer', session_id=session_id)}", |
| 523 | + json={ |
| 524 | + "label": session_id, |
| 525 | + "source": str(secure_path(Path(rsyncer_source.source))), |
| 526 | + }, |
| 527 | + headers={ |
| 528 | + "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" |
| 529 | + }, |
| 530 | + ) as resp: |
| 531 | + data = await resp.json() |
| 532 | + return data |
| 533 | + |
| 534 | + |
476 | 535 | class RSyncerInfo(BaseModel): |
477 | 536 | source: str |
478 | 537 | num_files_transferred: int |
|
0 commit comments