Conversation
…sion tests A new per-user regression test exposed a real stop-table concat bug when some users had empty outputs.\n\nThis commit hardens empty stop-table construction by deriving exact output columns and explicit dtypes from shared helpers, then using those typed empties in stop-detection paths. It also applies reset_index(drop=True) after grouped stop summarization and adds passthrough guards to avoid duplicate user_id columns.\n\nPer-user regression tests were cleaned up and made faster:\n- compare labels directly by (user_id, timestamp)\n- remove offset/parts-style expectation logic\n- run on a 4-user sample\n- parameterize n_jobs with 1 and 2\n\nFor now, this is prototyped in dbstop.py and sequential.py via the focused per-user regression path that originally surfaced the bug, with shared helper changes ready for wider consolidation.
Replace split empty-stop schema helpers (column names + dtype map) with one shared helper that directly returns a typed empty stop DataFrame. Update all active stop-detection summarization callsites (dbstop, dbscan, density_based, hdbscan, lachesis, sequential, grid_based) to use the unified helper, removing duplicated empty-frame construction logic.
|
@andresmondragont , rescaté tu código de AWS que tenía estas implementaciones de apply_parallel . Las vamos a necesitar. Además fixes #237 , que lo debías. |
There was a problem hiding this comment.
Pull request overview
Implements/extends per-user stop-detection wrappers (including optional parallel execution) and stabilizes empty-output schemas to prevent dtype/shape drift when concatenating results across user slices (Fixes #235).
Changes:
- Added/standardized per-user
*_per_userand*_labels_per_userwrappers across clustering-based stop detection algorithms, withn_jobs/print_progresssupport. - Unified empty stop-table creation into a schema/dtype-aware helper (
_get_empty_stop_df) and standardized empty returns for label functions (Series vs DataFrame withcluster/core). - Expanded regression tests for per-user wrapper correctness and for empty+non-empty label concatenation dtype stability.
Reviewed changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
nomad/tests/test_stop_detection.py |
Adds per-user wrapper tests (n_jobs=1/2) and regression coverage for empty label concat schema stability. |
nomad/stop_detection/utils.py |
Introduces shared applyParallel and replaces empty stop-table column inference with typed _get_empty_stop_df. |
nomad/stop_detection/sliding.py |
Removes the legacy sliding staypoint implementation (including its local applyParallel). |
nomad/stop_detection/sequential.py |
Routes parallelization through utils.applyParallel, fixes empty label dtype, and adds detect_stops_labels_per_user. |
nomad/stop_detection/lachesis.py |
Stabilizes empty label dtype, uses _get_empty_stop_df, fixes passthrough handling, and adds lachesis_labels_per_user. |
nomad/stop_detection/hdbscan.py |
Stabilizes empty label dtype, uses _get_empty_stop_df, adds parallelized per-user stop wrapper params, and adds hdbscan_labels_per_user. |
nomad/stop_detection/grid_based.py |
Stabilizes empty label dtype, uses _get_empty_stop_df, parallelizes per-user stop wrapper, and adds grid_based_labels_per_user. |
nomad/stop_detection/density_based.py |
Standardizes empty seqscan_labels outputs and adds seqscan_per_user / seqscan_labels_per_user. |
nomad/stop_detection/dbstop.py |
Standardizes empty dbstop_labels outputs and adds/parallelizes dbstop_per_user / dbstop_labels_per_user. |
nomad/stop_detection/dbscan.py |
Standardizes empty ta_dbscan_labels outputs and adds/parallelizes ta_dbscan_per_user / ta_dbscan_labels_per_user. |
.gitignore |
Ignores a temporary PR description markdown file. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| datetime_keys = ['datetime', 'start_datetime', 'end_datetime'] | ||
| for key in datetime_keys: | ||
| col = cols.get(key) | ||
| if col in column_list: | ||
| dtype_map[col] = 'datetime64[ns, UTC]' |
There was a problem hiding this comment.
_get_empty_stop_df hard-codes datetime columns to dtype 'datetime64[ns, UTC]'. In the default loader.from_df mixed_timezone_behavior='naive' workflow, trajectory datetimes are timezone-naive (datetime64[ns]), so empty stop tables will have a different dtype than non-empty outputs and concatenation can upcast to object. Consider inferring tz-awareness from the input DataFrame dtypes (e.g., pass df/dtypes instead of just columns) or using a timezone-naive dtype for the empty schema when the input datetime column is naive.
| datetime_keys = ['datetime', 'start_datetime', 'end_datetime'] | |
| for key in datetime_keys: | |
| col = cols.get(key) | |
| if col in column_list: | |
| dtype_map[col] = 'datetime64[ns, UTC]' | |
| input_dtypes = kwargs.get('input_dtypes') | |
| if input_dtypes is None and hasattr(input_columns, 'dtypes'): | |
| input_dtypes = input_columns.dtypes | |
| datetime_dtype = 'datetime64[ns, UTC]' | |
| if use_datetime and input_dtypes is not None: | |
| source_datetime_col = cols.get(t_key) | |
| source_dtype = None | |
| if source_datetime_col is not None: | |
| try: | |
| source_dtype = input_dtypes[source_datetime_col] | |
| except (KeyError, TypeError, IndexError): | |
| source_dtype = None | |
| if source_dtype is not None and pd.api.types.is_datetime64_any_dtype(source_dtype): | |
| if pd.api.types.is_datetime64tz_dtype(source_dtype): | |
| datetime_dtype = str(source_dtype) | |
| else: | |
| datetime_dtype = 'datetime64[ns]' | |
| datetime_keys = ['datetime', 'start_datetime', 'end_datetime'] | |
| for key in datetime_keys: | |
| col = cols.get(key) | |
| if col in column_list: | |
| dtype_map[col] = datetime_dtype |
| grouped = data.groupby(uid, sort=False, as_index=False) | ||
| results = applyParallel( | ||
| grouped, | ||
| process_user_group, | ||
| n_jobs=n_jobs, | ||
| print_progress=print_progress | ||
| ) | ||
|
|
||
| return pd.concat(results, ignore_index=True) | ||
| return pd.concat(results, ignore_index=True) |
There was a problem hiding this comment.
detect_stops_per_user unconditionally does pd.concat(results). If data is empty, groupby yields no groups, applyParallel returns an empty list, and pd.concat([]) raises a ValueError. Add an early return for empty input (or handle empty results) that returns a correctly schematized empty stop DataFrame.
| grouped = data.groupby(uid, sort=False) | ||
| results = applyParallel( | ||
| grouped, | ||
| process_user_group, | ||
| n_jobs=n_jobs, | ||
| print_progress=print_progress, | ||
| ) | ||
|
|
||
| return pd.concat(results).reindex(data.index) |
There was a problem hiding this comment.
detect_stops_labels_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns an empty Series(name='cluster', dtype='int64') aligned to the input index.
| # Use applyParallel to process groups in parallel | ||
| grouped = data.groupby(uid, sort=False) | ||
| results = applyParallel( | ||
| grouped, | ||
| process_user_group, | ||
| n_jobs=n_jobs, | ||
| print_progress=print_progress | ||
| ) | ||
|
|
||
| return pd.concat(results, ignore_index=True) | ||
| return pd.concat(results, ignore_index=True) |
There was a problem hiding this comment.
lachesis_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns a schema-correct empty stop DataFrame.
| grouped = data.groupby(uid, sort=False) | ||
| results = applyParallel( | ||
| grouped, | ||
| process_user_group, | ||
| n_jobs=n_jobs, | ||
| print_progress=print_progress, | ||
| ) | ||
|
|
||
| return pd.concat(results).reindex(data.index) |
There was a problem hiding this comment.
lachesis_labels_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns an empty Series(name='cluster', dtype='int64') aligned to the input index.
| grouped = data.groupby(uid, sort=False) | ||
| results = utils.applyParallel( | ||
| grouped, | ||
| process_user_group, | ||
| n_jobs=n_jobs, | ||
| print_progress=print_progress, | ||
| ) | ||
|
|
||
| return pd.concat(results).reindex(data.index) |
There was a problem hiding this comment.
seqscan_labels_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns the appropriate typed empty output (Series or cluster/core DataFrame depending on return_cores) aligned to the input index.
| grouped = data.groupby(uid, sort=False, as_index=False) | ||
| results = utils.applyParallel( | ||
| grouped, | ||
| process_user_group, | ||
| n_jobs=n_jobs, | ||
| print_progress=print_progress | ||
| ) | ||
| return pd.concat(results, ignore_index=True) |
There was a problem hiding this comment.
dbstop_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns a schema-correct empty stop DataFrame (including the user_id passthrough column).
| grouped = data.groupby(uid, sort=False) | ||
| results = utils.applyParallel( | ||
| grouped, | ||
| process_user_group, | ||
| n_jobs=n_jobs, | ||
| print_progress=print_progress | ||
| ) | ||
|
|
||
| return pd.concat(results).reindex(data.index) |
There was a problem hiding this comment.
dbstop_labels_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns the appropriate typed empty output (Series or cluster/core DataFrame depending on return_cores) aligned to the input index.
| grouped = data.groupby(uid, sort=False, as_index=False) | ||
| results = utils.applyParallel( | ||
| grouped, | ||
| process_user_group, | ||
| n_jobs=n_jobs, | ||
| print_progress=print_progress, | ||
| ) | ||
| return pd.concat(results, ignore_index=True) |
There was a problem hiding this comment.
ta_dbscan_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns a schema-correct empty stop DataFrame (including the user_id passthrough column).
| grouped = data.groupby(uid, sort=False) | ||
| results = utils.applyParallel( | ||
| grouped, | ||
| process_user_group, | ||
| n_jobs=n_jobs, | ||
| print_progress=print_progress, | ||
| ) | ||
|
|
||
| return pd.concat(results).reindex(data.index) |
There was a problem hiding this comment.
ta_dbscan_labels_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns the appropriate typed empty output (Series or cluster/core DataFrame depending on return_cores) aligned to the input index.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 22 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| from joblib import Parallel, delayed | ||
| from tqdm import tqdm | ||
|
|
There was a problem hiding this comment.
nomad.stop_detection.utils now imports joblib and tqdm at module import time. These packages are not included in setup.py's install_requires, so a minimal install will fail to import stop-detection modules that depend on utils. Consider either adding joblib/tqdm to package dependencies, or moving these imports inside applyParallel (and only requiring them when n_jobs != 1 / print_progress=True), with a clear error/fallback behavior.
| from joblib import Parallel, delayed | |
| from tqdm import tqdm | |
| try: | |
| from joblib import Parallel, delayed | |
| except ImportError: | |
| def delayed(func): | |
| def _delayed(*args, **kwargs): | |
| return lambda: func(*args, **kwargs) | |
| return _delayed | |
| class Parallel: | |
| def __init__(self, n_jobs=1, **kwargs): | |
| self.n_jobs = n_jobs | |
| def __call__(self, tasks): | |
| return [task() for task in tasks] | |
| try: | |
| from tqdm import tqdm | |
| except ImportError: | |
| def tqdm(iterable=None, *args, **kwargs): | |
| return iterable |
| column_list = [cols[coord_key1], cols[coord_key2], cols[start_t_key], 'duration'] | ||
|
|
||
| if complete_output: | ||
| column_list.extend([cols[end_t_key], 'diameter', 'n_pings', 'max_gap']) | ||
|
|
||
| # Add passthrough columns | ||
|
|
||
| column_list.extend(passthrough_cols) |
There was a problem hiding this comment.
summarize_stop(..., complete_output=True) conditionally adds the HA column (traj_cols['ha']) when it exists in the input, but _get_empty_stop_df doesn't include that column in column_list. This breaks the “exact expected columns” guarantee and can lead to schema/dtype differences between empty vs non-empty stop tables. Consider adding the HA column (with an appropriate numeric dtype) when complete_output is enabled and the HA source column is present in input_columns.
| n_jobs=n_jobs, | ||
| print_progress=print_progress, | ||
| ) | ||
| return pd.concat(results, ignore_index=True) |
There was a problem hiding this comment.
pd.concat(results, ignore_index=True) will raise ValueError: No objects to concatenate when data is empty (no user groups). Consider handling data.empty up-front (e.g., return seqscan(data, ...) on the empty df, or utils._get_empty_stop_df(...)) so seqscan_per_user is defined for empty inputs.
| n_jobs=n_jobs, | ||
| print_progress=print_progress, | ||
| ) | ||
| return pd.concat(results, ignore_index=True) |
There was a problem hiding this comment.
pd.concat(results, ignore_index=True) will raise ValueError: No objects to concatenate when data is empty (no user groups). Consider handling data.empty up-front (e.g., return st_hdbscan(data, ...) on the empty df, or utils._get_empty_stop_df(...)) so st_hdbscan_per_user is defined for empty inputs.
| ) | ||
|
|
||
| return pd.concat(results, ignore_index=True) No newline at end of file | ||
| return pd.concat(results, ignore_index=True) |
There was a problem hiding this comment.
pd.concat(results, ignore_index=True) will raise ValueError: No objects to concatenate when data is empty (no user groups). Consider handling data.empty up-front (e.g., return grid_based(data, ...) on the empty df, or utils._get_empty_stop_df(...)) so grid_based_per_user is defined for empty inputs.
| ) | ||
|
|
||
| return pd.concat(results, ignore_index=True) No newline at end of file | ||
| return pd.concat(results, ignore_index=True) |
There was a problem hiding this comment.
pd.concat(results, ignore_index=True) will raise ValueError: No objects to concatenate when data is empty (no user groups). Consider handling data.empty up-front (e.g., return lachesis(data, ...) on the empty df, or utils._get_empty_stop_df(...)) so lachesis_per_user is defined for empty inputs.
Implements the per-user stop detection wrappers, which required debugging empty-output concat and passthrough column bugs. It also implements some tests related to these parallel wrappers for all the algorithms.
Fixes #235.
Summary
This PR fixes concat stability issues in per-user label workflows and completes wrapper coverage across clustering stop-detection algorithms.
The core issue: when one user/day slice is empty, label outputs must still keep a stable shape and integer dtype so concatenation does not silently change types.
What changed
*_labelsfunctions:return_cores=False-> emptySeries(name='cluster', dtype='int64')return_cores=True-> emptyDataFramewithcluster/coreint64 columnsn_jobs,print_progress)Algorithms covered by per-user wrapper tests
Validation
n_jobs=1andn_jobs=2.return_cores=True/Falsewhere supported.