Skip to content

Parallelized stop detection per user#251

Open
paco-barreras wants to merge 8 commits intomainfrom
parallelized-stop-detection-per-user
Open

Parallelized stop detection per user#251
paco-barreras wants to merge 8 commits intomainfrom
parallelized-stop-detection-per-user

Conversation

@paco-barreras
Copy link
Copy Markdown
Collaborator

Implements the per-user stop detection wrappers, which required debugging empty-output concat and passthrough column bugs. It also implements some tests related to these parallel wrappers for all the algorithms.

Fixes #235.

Summary

This PR fixes concat stability issues in per-user label workflows and completes wrapper coverage across clustering stop-detection algorithms.

The core issue: when one user/day slice is empty, label outputs must still keep a stable shape and integer dtype so concatenation does not silently change types.

What changed

  1. Unified empty stop-table creation into one helper that directly returns a typed empty DataFrame.
  2. Standardized empty returns for all *_labels functions:
    • return_cores=False -> empty Series(name='cluster', dtype='int64')
    • return_cores=True -> empty DataFrame with cluster/core int64 columns
  3. Added missing per-user wrappers for both stop tables and labels, and aligned wrappers around:
    • user-id resolution/validation
    • optional parallel execution (n_jobs, print_progress)
    • deterministic concat output
  4. Expanded per-user regression tests from DBSTOP-only to all clustering algorithms in scope.

Algorithms covered by per-user wrapper tests

  1. DBSTOP
  2. TA-DBSCAN
  3. SeqScan
  4. HDBSCAN
  5. Lachesis
  6. Sequential

Validation

  1. Expanded per-user wrapper tests pass for both n_jobs=1 and n_jobs=2.
  2. Empty+non-empty label concat regression passes across all covered label algorithms, including return_cores=True/False where supported.

…sion tests

A new per-user regression test exposed a real stop-table concat bug when some users had empty outputs.\n\nThis commit hardens empty stop-table construction by deriving exact output columns and explicit dtypes from shared helpers, then using those typed empties in stop-detection paths. It also applies reset_index(drop=True) after grouped stop summarization and adds passthrough guards to avoid duplicate user_id columns.\n\nPer-user regression tests were cleaned up and made faster:\n- compare labels directly by (user_id, timestamp)\n- remove offset/parts-style expectation logic\n- run on a 4-user sample\n- parameterize n_jobs with 1 and 2\n\nFor now, this is prototyped in dbstop.py and sequential.py via the focused per-user regression path that originally surfaced the bug, with shared helper changes ready for wider consolidation.
Replace split empty-stop schema helpers (column names + dtype map) with one shared helper that directly returns a typed empty stop DataFrame.

Update all active stop-detection summarization callsites (dbstop, dbscan, density_based, hdbscan, lachesis, sequential, grid_based) to use the unified helper, removing duplicated empty-frame construction logic.
@paco-barreras
Copy link
Copy Markdown
Collaborator Author

@andresmondragont , rescaté tu código de AWS que tenía estas implementaciones de apply_parallel . Las vamos a necesitar. Además fixes #237 , que lo debías.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements/extends per-user stop-detection wrappers (including optional parallel execution) and stabilizes empty-output schemas to prevent dtype/shape drift when concatenating results across user slices (Fixes #235).

Changes:

  • Added/standardized per-user *_per_user and *_labels_per_user wrappers across clustering-based stop detection algorithms, with n_jobs/print_progress support.
  • Unified empty stop-table creation into a schema/dtype-aware helper (_get_empty_stop_df) and standardized empty returns for label functions (Series vs DataFrame with cluster/core).
  • Expanded regression tests for per-user wrapper correctness and for empty+non-empty label concatenation dtype stability.

Reviewed changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
nomad/tests/test_stop_detection.py Adds per-user wrapper tests (n_jobs=1/2) and regression coverage for empty label concat schema stability.
nomad/stop_detection/utils.py Introduces shared applyParallel and replaces empty stop-table column inference with typed _get_empty_stop_df.
nomad/stop_detection/sliding.py Removes the legacy sliding staypoint implementation (including its local applyParallel).
nomad/stop_detection/sequential.py Routes parallelization through utils.applyParallel, fixes empty label dtype, and adds detect_stops_labels_per_user.
nomad/stop_detection/lachesis.py Stabilizes empty label dtype, uses _get_empty_stop_df, fixes passthrough handling, and adds lachesis_labels_per_user.
nomad/stop_detection/hdbscan.py Stabilizes empty label dtype, uses _get_empty_stop_df, adds parallelized per-user stop wrapper params, and adds hdbscan_labels_per_user.
nomad/stop_detection/grid_based.py Stabilizes empty label dtype, uses _get_empty_stop_df, parallelizes per-user stop wrapper, and adds grid_based_labels_per_user.
nomad/stop_detection/density_based.py Standardizes empty seqscan_labels outputs and adds seqscan_per_user / seqscan_labels_per_user.
nomad/stop_detection/dbstop.py Standardizes empty dbstop_labels outputs and adds/parallelizes dbstop_per_user / dbstop_labels_per_user.
nomad/stop_detection/dbscan.py Standardizes empty ta_dbscan_labels outputs and adds/parallelizes ta_dbscan_per_user / ta_dbscan_labels_per_user.
.gitignore Ignores a temporary PR description markdown file.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +543 to +547
datetime_keys = ['datetime', 'start_datetime', 'end_datetime']
for key in datetime_keys:
col = cols.get(key)
if col in column_list:
dtype_map[col] = 'datetime64[ns, UTC]'
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_empty_stop_df hard-codes datetime columns to dtype 'datetime64[ns, UTC]'. In the default loader.from_df mixed_timezone_behavior='naive' workflow, trajectory datetimes are timezone-naive (datetime64[ns]), so empty stop tables will have a different dtype than non-empty outputs and concatenation can upcast to object. Consider inferring tz-awareness from the input DataFrame dtypes (e.g., pass df/dtypes instead of just columns) or using a timezone-naive dtype for the empty schema when the input datetime column is naive.

Suggested change
datetime_keys = ['datetime', 'start_datetime', 'end_datetime']
for key in datetime_keys:
col = cols.get(key)
if col in column_list:
dtype_map[col] = 'datetime64[ns, UTC]'
input_dtypes = kwargs.get('input_dtypes')
if input_dtypes is None and hasattr(input_columns, 'dtypes'):
input_dtypes = input_columns.dtypes
datetime_dtype = 'datetime64[ns, UTC]'
if use_datetime and input_dtypes is not None:
source_datetime_col = cols.get(t_key)
source_dtype = None
if source_datetime_col is not None:
try:
source_dtype = input_dtypes[source_datetime_col]
except (KeyError, TypeError, IndexError):
source_dtype = None
if source_dtype is not None and pd.api.types.is_datetime64_any_dtype(source_dtype):
if pd.api.types.is_datetime64tz_dtype(source_dtype):
datetime_dtype = str(source_dtype)
else:
datetime_dtype = 'datetime64[ns]'
datetime_keys = ['datetime', 'start_datetime', 'end_datetime']
for key in datetime_keys:
col = cols.get(key)
if col in column_list:
dtype_map[col] = datetime_dtype

Copilot uses AI. Check for mistakes.
Comment on lines 308 to +316
grouped = data.groupby(uid, sort=False, as_index=False)
results = applyParallel(
grouped,
process_user_group,
n_jobs=n_jobs,
print_progress=print_progress
)

return pd.concat(results, ignore_index=True)
return pd.concat(results, ignore_index=True)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

detect_stops_per_user unconditionally does pd.concat(results). If data is empty, groupby yields no groups, applyParallel returns an empty list, and pd.concat([]) raises a ValueError. Add an early return for empty input (or handle empty results) that returns a correctly schematized empty stop DataFrame.

Copilot uses AI. Check for mistakes.
Comment on lines +351 to +359
grouped = data.groupby(uid, sort=False)
results = applyParallel(
grouped,
process_user_group,
n_jobs=n_jobs,
print_progress=print_progress,
)

return pd.concat(results).reindex(data.index)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

detect_stops_labels_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns an empty Series(name='cluster', dtype='int64') aligned to the input index.

Copilot uses AI. Check for mistakes.
Comment on lines 256 to +265
# Use applyParallel to process groups in parallel
grouped = data.groupby(uid, sort=False)
results = applyParallel(
grouped,
process_user_group,
n_jobs=n_jobs,
print_progress=print_progress
)

return pd.concat(results, ignore_index=True)
return pd.concat(results, ignore_index=True)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lachesis_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns a schema-correct empty stop DataFrame.

Copilot uses AI. Check for mistakes.
Comment on lines +298 to +306
grouped = data.groupby(uid, sort=False)
results = applyParallel(
grouped,
process_user_group,
n_jobs=n_jobs,
print_progress=print_progress,
)

return pd.concat(results).reindex(data.index)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lachesis_labels_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns an empty Series(name='cluster', dtype='int64') aligned to the input index.

Copilot uses AI. Check for mistakes.
Comment on lines +398 to +406
grouped = data.groupby(uid, sort=False)
results = utils.applyParallel(
grouped,
process_user_group,
n_jobs=n_jobs,
print_progress=print_progress,
)

return pd.concat(results).reindex(data.index)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seqscan_labels_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns the appropriate typed empty output (Series or cluster/core DataFrame depending on return_cores) aligned to the input index.

Copilot uses AI. Check for mistakes.
Comment on lines +271 to 278
grouped = data.groupby(uid, sort=False, as_index=False)
results = utils.applyParallel(
grouped,
process_user_group,
n_jobs=n_jobs,
print_progress=print_progress
)
return pd.concat(results, ignore_index=True)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dbstop_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns a schema-correct empty stop DataFrame (including the user_id passthrough column).

Copilot uses AI. Check for mistakes.
Comment on lines +313 to +321
grouped = data.groupby(uid, sort=False)
results = utils.applyParallel(
grouped,
process_user_group,
n_jobs=n_jobs,
print_progress=print_progress
)

return pd.concat(results).reindex(data.index)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dbstop_labels_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns the appropriate typed empty output (Series or cluster/core DataFrame depending on return_cores) aligned to the input index.

Copilot uses AI. Check for mistakes.
Comment on lines +304 to 311
grouped = data.groupby(uid, sort=False, as_index=False)
results = utils.applyParallel(
grouped,
process_user_group,
n_jobs=n_jobs,
print_progress=print_progress,
)
return pd.concat(results, ignore_index=True)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ta_dbscan_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns a schema-correct empty stop DataFrame (including the user_id passthrough column).

Copilot uses AI. Check for mistakes.
Comment on lines +348 to +356
grouped = data.groupby(uid, sort=False)
results = utils.applyParallel(
grouped,
process_user_group,
n_jobs=n_jobs,
print_progress=print_progress,
)

return pd.concat(results).reindex(data.index)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ta_dbscan_labels_per_user unconditionally does pd.concat(results). If data is empty, results is empty and pd.concat([]) raises a ValueError. Add an early return for empty input (or empty results) that returns the appropriate typed empty output (Series or cluster/core DataFrame depending on return_cores) aligned to the input index.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 19 out of 22 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +15 to 17
from joblib import Parallel, delayed
from tqdm import tqdm

Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nomad.stop_detection.utils now imports joblib and tqdm at module import time. These packages are not included in setup.py's install_requires, so a minimal install will fail to import stop-detection modules that depend on utils. Consider either adding joblib/tqdm to package dependencies, or moving these imports inside applyParallel (and only requiring them when n_jobs != 1 / print_progress=True), with a clear error/fallback behavior.

Suggested change
from joblib import Parallel, delayed
from tqdm import tqdm
try:
from joblib import Parallel, delayed
except ImportError:
def delayed(func):
def _delayed(*args, **kwargs):
return lambda: func(*args, **kwargs)
return _delayed
class Parallel:
def __init__(self, n_jobs=1, **kwargs):
self.n_jobs = n_jobs
def __call__(self, tasks):
return [task() for task in tasks]
try:
from tqdm import tqdm
except ImportError:
def tqdm(iterable=None, *args, **kwargs):
return iterable

Copilot uses AI. Check for mistakes.
Comment on lines 535 to 539
column_list = [cols[coord_key1], cols[coord_key2], cols[start_t_key], 'duration']

if complete_output:
column_list.extend([cols[end_t_key], 'diameter', 'n_pings', 'max_gap'])

# Add passthrough columns

column_list.extend(passthrough_cols)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

summarize_stop(..., complete_output=True) conditionally adds the HA column (traj_cols['ha']) when it exists in the input, but _get_empty_stop_df doesn't include that column in column_list. This breaks the “exact expected columns” guarantee and can lead to schema/dtype differences between empty vs non-empty stop tables. Consider adding the HA column (with an appropriate numeric dtype) when complete_output is enabled and the HA source column is present in input_columns.

Copilot uses AI. Check for mistakes.
n_jobs=n_jobs,
print_progress=print_progress,
)
return pd.concat(results, ignore_index=True)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pd.concat(results, ignore_index=True) will raise ValueError: No objects to concatenate when data is empty (no user groups). Consider handling data.empty up-front (e.g., return seqscan(data, ...) on the empty df, or utils._get_empty_stop_df(...)) so seqscan_per_user is defined for empty inputs.

Copilot uses AI. Check for mistakes.
n_jobs=n_jobs,
print_progress=print_progress,
)
return pd.concat(results, ignore_index=True)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pd.concat(results, ignore_index=True) will raise ValueError: No objects to concatenate when data is empty (no user groups). Consider handling data.empty up-front (e.g., return st_hdbscan(data, ...) on the empty df, or utils._get_empty_stop_df(...)) so st_hdbscan_per_user is defined for empty inputs.

Copilot uses AI. Check for mistakes.
)

return pd.concat(results, ignore_index=True) No newline at end of file
return pd.concat(results, ignore_index=True)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pd.concat(results, ignore_index=True) will raise ValueError: No objects to concatenate when data is empty (no user groups). Consider handling data.empty up-front (e.g., return grid_based(data, ...) on the empty df, or utils._get_empty_stop_df(...)) so grid_based_per_user is defined for empty inputs.

Copilot uses AI. Check for mistakes.
)

return pd.concat(results, ignore_index=True) No newline at end of file
return pd.concat(results, ignore_index=True)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pd.concat(results, ignore_index=True) will raise ValueError: No objects to concatenate when data is empty (no user groups). Consider handling data.empty up-front (e.g., return lachesis(data, ...) on the empty df, or utils._get_empty_stop_df(...)) so lachesis_per_user is defined for empty inputs.

Copilot uses AI. Check for mistakes.
@paco-barreras paco-barreras requested a review from Copilot April 15, 2026 03:04
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@paco-barreras paco-barreras requested a review from Copilot April 15, 2026 03:05
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@paco-barreras paco-barreras requested a review from Copilot April 15, 2026 03:28
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@paco-barreras paco-barreras requested a review from Copilot April 15, 2026 03:39
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

parallelized implementation for all algorithms (sequential, lachesis, st-dbscan, seqscan)

2 participants