Skip to content

Commit d132f48

Browse files
joelfiddesclaude
andcommitted
Add parallel file reading to agg_by_var_fsm
- Use ThreadPoolExecutor for parallel FSM file reading (8 workers default) - Activates when n_files > 50 (avoids thread overhead for small runs) - Fix SyntaxWarning: use raw string for regex (r'\s+') - Fix FutureWarning: use pd.to_datetime instead of deprecated parse_dates Expected 4-8x speedup for file reading phase with 2000+ clusters. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent dfa6aa1 commit d132f48

1 file changed

Lines changed: 22 additions & 21 deletions

File tree

TopoPyScale/sim_fsm.py

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import sys
1010
import glob
1111
import re
12+
from concurrent.futures import ThreadPoolExecutor
1213
import pandas as pd
1314
import numpy as np
1415
import rasterio
@@ -315,7 +316,7 @@ def fsm_sim(nlstfile, fsm_exec, delete_nlst_files=True):
315316

316317

317318

318-
def agg_by_var_fsm( var='snd', fsm_path = "./outputs/fsm_sims"):
319+
def agg_by_var_fsm(var='snd', fsm_path="./outputs/fsm_sims", n_workers=8):
319320
"""
320321
Function to make single variable multi cluster files as preprocessing step before spatialisation. This is much more efficient than looping over individual simulation files per cluster.
321322
For V variables , C clusters and T timesteps this turns C individual files of dimensions V x T into V individual files of dimensions C x T.
@@ -332,19 +333,16 @@ def agg_by_var_fsm( var='snd', fsm_path = "./outputs/fsm_sims"):
332333
gt50 - ground temperature (50cm depth) degC
333334
334335
fsm_path (str): location of simulation files
335-
Returns:
336+
n_workers (int): number of parallel threads for file reading (default: 8)
337+
Returns:
336338
dataframe
337-
338-
339-
340339
"""
341340

342341
# find all simulation files and natural sort https://en.wikipedia.org/wiki/Natural_sort_order
343342
a = glob.glob(fsm_path+"/sim_*")
344343

345-
if len(a) == 0:
346-
sys.exit("ERROR: " +fsm_path + " does not exist or is empty")
347-
344+
if len(a) == 0:
345+
sys.exit("ERROR: " +fsm_path + " does not exist or is empty")
348346

349347
def natural_sort(l):
350348
def convert(text): return int(text) if text.isdigit() else text.lower()
@@ -358,36 +356,39 @@ def alphanum_key(key): return [convert(c) for c in re.split('([0-9]+)', key)]
358356
'gst':-2,
359357
'gt50':-1}
360358

361-
362-
363359
if var.lower() in ['alb', 'rof', 'snd', 'swe', 'gst', 'gt50']:
364360
ncol = int(fsm_columns.get(var))
365361
else:
366362
print("indicate ncol or var within ['alb', 'rof', 'snd', 'swe', 'gst', 'gt50']")
367363

368364
file_list = natural_sort(a)
369365

370-
mydf = pd.read_csv(file_list[0], sep='\s+', parse_dates=[[0, 1, 2]], header=None)
371-
mydates = mydf.iloc[:, 0]
372-
373-
# can do temp subset here
374-
# startIndex = df[df.iloc[:,0]==str(daYear-1)+"-09-01"].index.values
375-
# endIndex = df[df.iloc[:,0]==str(daYear)+"-09-01"].index.values
366+
mydf = pd.read_csv(file_list[0], sep=r'\s+', header=None)
367+
# Combine date columns (year, month, day) into datetime
368+
mydates = pd.to_datetime(mydf.iloc[:, 0].astype(str) + '-' +
369+
mydf.iloc[:, 1].astype(str) + '-' +
370+
mydf.iloc[:, 2].astype(str))
376371

377372
# all values
378373
startIndex = 0
379374
endIndex = mydf.shape[0]
380375

381-
# efficient way to parse multifile
382-
data = []
383-
for file_path in file_list:
376+
# Read file helper for parallel execution
377+
def read_fsm_file(file_path):
378+
return np.genfromtxt(file_path, usecols=ncol)[int(startIndex):int(endIndex)]
384379

385-
data.append(np.genfromtxt(file_path, usecols=ncol)[int(startIndex):int(endIndex)])
380+
# Parallel file reading - much faster for large cluster counts
381+
n_files = len(file_list)
382+
if n_files > 50 and n_workers > 1:
383+
with ThreadPoolExecutor(max_workers=n_workers) as executor:
384+
data = list(executor.map(read_fsm_file, file_list))
385+
else:
386+
# Sequential for small cluster counts (thread overhead not worth it)
387+
data = [read_fsm_file(f) for f in file_list]
386388

387389
myarray = np.asarray(data) # samples x days
388390
df = pd.DataFrame(myarray.transpose())
389391

390-
391392
# add timestamp
392393
df.insert(0, 'Datetime', mydates)
393394
df = df.set_index("Datetime")

0 commit comments

Comments
 (0)