|
51 | 51 | from loguru import logger |
52 | 52 | from mth5.groups import RunGroup |
53 | 53 | from mth5.helpers import close_open_files |
54 | | -from mth5.timeseries.spectre import Spectrogram |
| 54 | +from mth5.processing import KernelDataset |
55 | 55 | from typing import Literal, Optional, Tuple, Union |
56 | 56 |
|
57 | 57 | import aurora.config.metadata.processing |
@@ -92,9 +92,6 @@ def make_stft_objects( |
92 | 92 | The data time series from the run to transform |
93 | 93 | units: Literal["MT", "SI"] = "MT", |
94 | 94 | expects "MT". May change so that this is the only accepted set of units |
95 | | - station_id: str |
96 | | - To be deprecated, this information is contained in the run_obj as |
97 | | - run_obj.station_group.metadata.id |
98 | 95 |
|
99 | 96 | Returns |
100 | 97 | ------- |
@@ -485,8 +482,8 @@ def get_spectrograms(tfk: TransferFunctionKernel, i_dec_level, units="MT"): |
485 | 482 |
|
486 | 483 | # Check first if TS processing or accessing FC Levels |
487 | 484 | for i, row in tfk.dataset_df.iterrows(): |
488 | | - # This iterator could be updated to iterate over row-pairs if remote is True, |
489 | | - # corresponding to simultaneous data |
| 485 | + # TODO: Consider updating this to iterate over row-pairs corresponding to simultaneous data. |
| 486 | + # Grouping by (sorted) starttime should accomplish this. |
490 | 487 |
|
491 | 488 | if not tfk.is_valid_dataset(row, i_dec_level): |
492 | 489 | continue |
@@ -515,7 +512,7 @@ def get_spectrograms(tfk: TransferFunctionKernel, i_dec_level, units="MT"): |
515 | 512 | # Pack FCs into h5 |
516 | 513 | dec_level_config = tfk.config.decimations[i_dec_level] |
517 | 514 | save_fourier_coefficients(dec_level_config, row, run_obj, stft_obj) |
518 | | - # TODO: 1st pass, cast stft_obj to a Spectrogram here |
| 515 | + # TODO: cast stft_obj to a Spectrogram here |
519 | 516 |
|
520 | 517 | stfts = append_chunk_to_stfts(stfts, stft_obj, row.remote) |
521 | 518 |
|
@@ -583,6 +580,12 @@ def process_mth5_legacy( |
583 | 580 | local_merged_stft_obj, remote_merged_stft_obj = merge_stfts(stfts, tfk) |
584 | 581 |
|
585 | 582 | # FC TF Interface here (see Note #3) |
| 583 | + try: |
| 584 | + extract_features(dec_level_config, tfk_dataset) |
| 585 | + calculate_weights(dec_level_config, tfk_dataset) |
| 586 | + except Exception as e: |
| 587 | + msg = f"Features could not be accessed -- {e}" |
| 588 | + logger.warning(msg) |
586 | 589 | # Feature Extraction, Selection of weights |
587 | 590 |
|
588 | 591 | ttfz_obj = process_tf_decimation_level( |
@@ -620,8 +623,7 @@ def process_mth5_legacy( |
620 | 623 |
|
621 | 624 | tfk.dataset.close_mth5s() |
622 | 625 | if return_collection: |
623 | | - # this is now really only to be used for debugging and may be deprecated soon |
624 | | - return tf_collection |
| 626 | + return tf_collection # mostly used for debugging and may be deprecated. |
625 | 627 | else: |
626 | 628 | return tf_cls |
627 | 629 |
|
@@ -686,3 +688,182 @@ def process_mth5( |
686 | 688 | logger.error(msg) |
687 | 689 | logger.exception(msg) |
688 | 690 | return |
| 691 | + |
| 692 | + |
| 693 | +def extract_features( |
| 694 | + dec_level_config: AuroraDecimationLevel, tfk_dataset: KernelDataset |
| 695 | +) -> pd.DataFrame: |
| 696 | + |
| 697 | + """ |
| 698 | + Temporal place holder. |
| 699 | +
|
| 700 | + In the future, features will be stored in the MTH5 and accessible as df or xr. |
| 701 | + Here we compute them in place -- its inefficient, but will allow as faster |
| 702 | + end-to-end prototype. |
| 703 | +
|
| 704 | + Note that the container with the |
| 705 | +
|
| 706 | + Development Note #4: |
| 707 | + Features that will be used for FC-weighting are expected to share a time axis with the FC data. |
| 708 | + (This could possibly be relaxed in the future with some interpolation, but for now, enforce it). |
| 709 | + This means that striding window features derived from time series should be implemented using the |
| 710 | + WindowedTimeSeries methods as in run_ts_to_stft (including truncation to clock-zero) etc. |
| 711 | +
|
| 712 | + Parameters |
| 713 | + ---------- |
| 714 | + dec_level_config |
| 715 | + tfk_dataset |
| 716 | +
|
| 717 | + Returns |
| 718 | + ------- |
| 719 | +
|
| 720 | + """ |
| 721 | + try: |
| 722 | + features = read_features_from_mth5() |
| 723 | + return features |
| 724 | + except Exception as e: |
| 725 | + msg = f"Features could not be accessed from MTH5 -- {e}\n" |
| 726 | + msg += "Calculating features on the fly (development only)" |
| 727 | + logger.warning(msg) |
| 728 | + |
| 729 | + for chws in dec_level_config.channel_weight_specs: |
| 730 | + # Loop over features and compute them |
| 731 | + msg = f"{chws}" |
| 732 | + logger.info(msg) |
| 733 | + for fws in chws.feature_weight_specs: |
| 734 | + msg = f"feature weight spec: {fws}" |
| 735 | + logger.info(msg) |
| 736 | + feature = fws.feature |
| 737 | + msg = f"feature: {feature}" |
| 738 | + logger.info(msg) |
| 739 | + feature_chunks = [] |
| 740 | + if feature.name == "coherence": |
| 741 | + msg = f"{feature.name} is not supported as a data weighting feature" |
| 742 | + logger.warning(msg) |
| 743 | + elif feature.name == "mulitple_coherence": |
| 744 | + msg = f"{feature.name} is not supported as a data weighting feature" |
| 745 | + raise NotImplementedError(msg) |
| 746 | + elif feature.name == "striding_window_coherence": |
| 747 | + # TODO: review logic in time_series_helpers.run_ts_to_stft. This logic could be |
| 748 | + # modified to work similarly. Importantly, we need to map the time axis of stft |
| 749 | + # onto the features |
| 750 | + feature.validate_station_ids( |
| 751 | + local_station_id=tfk_dataset.local_station_id, |
| 752 | + remote_station_id=tfk_dataset.remote_station_id, |
| 753 | + ) |
| 754 | + |
| 755 | + # Make main window scheme agree with the one used for the FC calculation |
| 756 | + feature.window = dec_level_config.stft.window |
| 757 | + feature.set_subwindow_from_window(fraction=0.25) |
| 758 | + |
| 759 | + # get data required for computing the feature. |
| 760 | + # Loop the runs (or run-pairs) ... this should be equivalent to grouping on start time. |
| 761 | + # TODO: consider mixing in valid run info from processing_summary here, (avoid window too long for data) |
| 762 | + # Desirable to have some "processing_run" iterator supplied by KernelDataset. |
| 763 | + from aurora.pipelines.time_series_helpers import ( |
| 764 | + truncate_to_clock_zero, |
| 765 | + ) # TODO: consider storing truncated data |
| 766 | + |
| 767 | + tmp = tfk_dataset.df.copy(deep=True) |
| 768 | + group_by = [ |
| 769 | + "start", |
| 770 | + ] |
| 771 | + grouper = tmp.groupby( |
| 772 | + group_by |
| 773 | + ) # these should have 1 or two rows per group |
| 774 | + for start, df in grouper: |
| 775 | + end = df.end.unique()[0] # nice to have this for info log |
| 776 | + logger.info("Access ch1 and ch2 ") |
| 777 | + ch1_row = df[df.station == feature.station1].iloc[0] |
| 778 | + ch1_data = ch1_row.run_dataarray.to_dataset("channel")[feature.ch1] |
| 779 | + ch1_data = truncate_to_clock_zero( |
| 780 | + decimation_obj=dec_level_config, run_xrds=ch1_data |
| 781 | + ) |
| 782 | + ch2_row = df[df.station == feature.station2].iloc[0] |
| 783 | + ch2_data = ch2_row.run_dataarray.to_dataset("channel")[feature.ch2] |
| 784 | + ch2_data = truncate_to_clock_zero( |
| 785 | + decimation_obj=dec_level_config, run_xrds=ch2_data |
| 786 | + ) |
| 787 | + msg = f"Data for computing {feature.name} on {start} -- {end} ready" |
| 788 | + logger.info(msg) |
| 789 | + # Compute the feature. |
| 790 | + freqs, coherence_spectrogram = feature.compute(ch1_data, ch2_data) |
| 791 | + |
| 792 | + # Get Time Axis (See Development Note #4) |
| 793 | + # now create a WindowedTimeSeries and get the time axis") |
| 794 | + from aurora.time_series.windowing_scheme import ( |
| 795 | + window_scheme_from_decimation, |
| 796 | + ) |
| 797 | + |
| 798 | + windowing_scheme = window_scheme_from_decimation( |
| 799 | + decimation=dec_level_config |
| 800 | + ) |
| 801 | + ch1_dataset = ch1_data.to_dataset() |
| 802 | + windowed_obj = windowing_scheme.apply_sliding_window( |
| 803 | + ch1_dataset, dt=1.0 / dec_level_config.decimation.sample_rate |
| 804 | + ) |
| 805 | + # Now, take the time axis off the windowed obj, and bind it to coherence_spectrogram |
| 806 | + # Check the logic in time_series_helpers.fft_xr_ds for how to do this, and note there is a model |
| 807 | + # there for multivariate. |
| 808 | + coherence_spectrogram_xr = xr.DataArray( |
| 809 | + coherence_spectrogram, |
| 810 | + dims=["time", "frequency"], |
| 811 | + coords={"frequency": freqs, "time": windowed_obj.time.data}, |
| 812 | + ) |
| 813 | + feature_chunks.append(coherence_spectrogram_xr) |
| 814 | + feature_data = xr.concat(feature_chunks, "time") |
| 815 | + feature.data = feature_data # bind feature data to feature instance (maybe temporal workaround) |
| 816 | + return |
| 817 | + |
| 818 | + |
| 819 | +def read_features_from_mth5(): |
| 820 | + raise NotImplementedError |
| 821 | + |
| 822 | + |
| 823 | +def calculate_weights( |
| 824 | + dec_level_config: AuroraDecimationLevel, tfk_dataset: KernelDataset |
| 825 | +): |
| 826 | + """ |
| 827 | + Placeholder. |
| 828 | +
|
| 829 | + Development Note #5. |
| 830 | + The calculation of weights from features is in general not a simple operation. |
| 831 | + Naive calculation by multiplying data with weight functions may be relatively simple, but even this is |
| 832 | + complicated by the selection of weight functions, and transition regions. Moreover, when using hard |
| 833 | + thresholds (weights that can go to zero) it is possible to unintentionally "zero-out" the whole dataset. |
| 834 | +
|
| 835 | + Such problems can be partly addressed by implementing strategies such as; if all weights are zero, keep |
| 836 | + the "best 20%" of the data, say. However, when multiple features are being used, we then need to handle |
| 837 | + ranking of data that may fail to qualify based on one feature, but OK based on another. General handling |
| 838 | + for this may involve case studies of features and their relationship to data quality, and should be done |
| 839 | + with visualization capability. |
| 840 | +
|
| 841 | + This analysis can be further complicated by the fact that some features are dynamic. Consider |
| 842 | + for example the Mahalnobis Distance (MD). This feature gives a reasonable way to down-weight data based on |
| 843 | + the empirical distributions, but, say that another feature is also employed (multiple coherence (MC) for |
| 844 | + example). The MD values of the data will change depending on whether the MC was applied before or |
| 845 | + after the MC trimming of the dataset. In the MD case, a reasonable workaround would be to add an MD option |
| 846 | + in the regression, rather than in the preliminary weighting, since the robust regression repeatedly |
| 847 | + recomputes weights based on updated distributions. Thus features can be divided into 'static' and |
| 848 | + 'dynamic'. Static features can be computed before regression, and used to pre-weight the data, whereas |
| 849 | + dynamic features are better worked into the regression loops. |
| 850 | +
|
| 851 | + There may not be an ideal general solution, but it is likely that there will be a few "plays" that we |
| 852 | + can devise that will work to improve the data most of the time. |
| 853 | +
|
| 854 | + For the reasons outlined above, the weight calculation will be in development for some time, and will |
| 855 | + likely require feature-visualization tools before we settle on a fixed set of strategies. In the meantime, |
| 856 | + we can support |
| 857 | +
|
| 858 | + Parameters |
| 859 | + ---------- |
| 860 | + features |
| 861 | + chws |
| 862 | +
|
| 863 | + Returns |
| 864 | + ------- |
| 865 | +
|
| 866 | + """ |
| 867 | + msg = "Weights calculation Not implemented" |
| 868 | + logger.error(msg) |
| 869 | + return |
0 commit comments