Skip to content

Commit 1c217a8

Browse files
committed
added functions for nc-zarr conversion
1 parent 5c6a9b7 commit 1c217a8

1 file changed

Lines changed: 81 additions & 124 deletions

File tree

TopoPyScale/fetch_era5.py

Lines changed: 81 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import xarray as xr
1111
import numpy as np
1212
import dask
13+
from dask.distributed import Client
14+
1315
try:
1416
from zarr.codecs import BloscCodec
1517
except ImportError:
@@ -19,6 +21,7 @@
1921
except ImportError:
2022
# Fallback for older versions or different structure
2123
from zarr import Blosc as BloscCodec
24+
2225
from datetime import datetime
2326

2427
import cdsapi, os, sys
@@ -258,27 +261,65 @@ def to_zarr(self):
258261
)
259262

260263

264+
def convert_yearly_nc_to_zarr(path_to_netcdfs='inputs/climate/yearly',
265+
zarr_store='inputs/climate/ERA5.zarr',
266+
chunks={'latitude':3, 'longitude':3, 'time':8760},
267+
compressor=None,
268+
plev_name='PLEV_*.nc',
269+
surf_name='SURF_*.nc'):
270+
"""
271+
Function to convert a stack of yearly netcdf files to a Zarr store. Combines SURF and PLEV dataset.
272+
"""
261273

274+
dwd = Path(path_to_netcdfs)
275+
surface_files = sorted(dwd.glob(surf_name))
276+
pressure_files = sorted(dwd.glob(plev_name))
277+
if compressor is None:
278+
compressor = BloscCodec(cname='lz4', clevel=5, shuffle='bitshuffle', blocksize=0)
279+
280+
# Write the merged dataset and make initial Zarr
281+
with xr.open_dataset(surface_files[0]) as surface_ds, xr.open_dataset(pressure_files[0]) as pressure_ds:
282+
283+
combined_ds = xr.merge([surface_ds, pressure_ds], compat='override')
284+
encoder = {var: {"compressors": compressor} for var in combined_ds}
285+
combined_ds.chunk(chunks).to_zarr(zarr_store, mode="w", encoding=encoder)
262286

263287

288+
# Append remaining years
289+
for surf_file, pres_file in zip(surface_files[1:], pressure_files[1:]):
290+
with xr.open_dataset(surf_file) as surface_ds, xr.open_dataset(pres_file) as pressure_ds:
291+
print(f"---> Appending year {surf_file.name.split('.')[0][-4:]}")
292+
combined_ds = xr.merge([surface_ds, pressure_ds], compat='override')
293+
combined_ds.chunk(chunks).to_zarr(zarr_store, append_dim="time")
264294

295+
print(f"===> Combined Zarr store saved to {zarr_store}")
296+
297+
298+
def append_year_to_zarr(path_to_netcdfs='inputs/climate/yearly',
299+
zarr_store='inputs/climate/ERA5.zarr',
300+
chunks={'latitude':3, 'longitude':3, 'time':8760},
301+
compressor=None,
302+
plev_files=[],
303+
surf_files=[]):
304+
"""
305+
Function to append years to an existing Zarr store
306+
"""
307+
# Append remaining years
308+
for surf_file, pres_file in zip(surface_files, pressure_files):
309+
with xr.open_dataset(surf_file) as surface_ds, xr.open_dataset(pres_file) as pressure_ds:
310+
lyear.append(surf_file.name.split('.')[0][-4:])
311+
print(f"---> Appending year {surf_file.name.split('.')[0][-4:]}")
312+
combined_ds = xr.merge([surface_ds, pressure_ds], compat='override')
313+
combined_ds.chunk(chunks).to_zarr(zarr_store, append_dim="time")
314+
print(f"===> Years {lyear} appended to the Zarr store {zarr_store}")
265315

266-
def append_nc_to_zarr_region(year, ncfile, zarrstore, surf=False):
267-
ncfile = str(ncfile)
268-
print(f"---> Appending {ncfile} to {zarrstore}")
269-
dp = xr.open_dataset(ncfile, chunks='auto')
270-
if surf:
271-
dp = dp.rename({'z':'z_surf'})
272-
dp.to_zarr(zarrstore, mode='a',region='auto', align_chunks=True)
273-
dp = None
274316

275-
def convert_netcdf_stack_to_zarr(path_to_netcdfs='inputs/climate/yearly',
317+
def convert_daily_nc_to_zarr(path_to_netcdfs='inputs/climate/yearly',
276318
zarrout='inputs/climate/ERA5.zarr',
277319
chunks={'latitude':3, 'longitude':3, 'time':8760, 'level':13},
278320
compressor=None,
279-
plev_name='PLEV_*.nc',
280-
surf_name='SURF_*.nc',
281-
parallelize=False,
321+
plev_name='dPLEV_*.nc',
322+
surf_name='dSURF_*.nc',
282323
n_cores=4):
283324
"""
284325
Function to convert a stack of netcdf PLEV and SURF files to a zarr store. Optimizing chunking based on auto detection by dask.
@@ -290,124 +331,40 @@ def convert_netcdf_stack_to_zarr(path_to_netcdfs='inputs/climate/yearly',
290331
compressor (obj): Compressor object to use for encoding. See Zarr compression. Default: BloscCodec(cname='lz4', clevel=5, shuffle='bitshuffle', blocksize=0)
291332
plev_name (str): file pattern of the netcdf containing the pressure levels. Use * instead of the year
292333
surf_name (str): file pattern of the netcdf containing the surface level. Use * instead of the year
334+
n_cores (int): number of cores on which to distribute the load
293335
294336
"""
295-
pn = Path(path_to_netcdfs)
296-
pz = Path(zarrout)
297-
298-
# lazy load stack of PLEV netcdf to grab dimensions and attributes
299-
ds = xr.open_mfdataset(str(pn / plev_name), parallel=True, chunks='auto')
300-
tvec = ds.time.values
301-
za1 = dask.array.empty((len(tvec), ds.level.shape[0], ds.latitude.shape[0], ds.longitude.shape[0] ), dtype='float32')
302-
za2 = dask.array.empty((len(tvec), ds.latitude.shape[0], ds.longitude.shape[0] ), dtype='float32')
303-
304-
dd = xr.Dataset(
305-
coords={
306-
'time':tvec,
307-
'longitude': ds.longitude.values,
308-
'latitude': ds.latitude.values,
309-
'level': ds.level.values,
310-
},
311-
data_vars={
312-
'z': (('time', 'level', 'latitude', 'longitude'), za1),
313-
't': (('time', 'level', 'latitude', 'longitude'), za1),
314-
'u': (('time', 'level', 'latitude', 'longitude'), za1),
315-
'v': (('time', 'level', 'latitude', 'longitude'), za1),
316-
'q': (('time', 'level', 'latitude', 'longitude'), za1),
317-
'r': (('time', 'level', 'latitude', 'longitude'), za1),
318-
'z_surf': (('time', 'latitude', 'longitude'), za2),
319-
'd2m': (('time', 'latitude', 'longitude'), za2),
320-
'sp': (('time', 'latitude', 'longitude'), za2),
321-
'strd': (('time', 'latitude', 'longitude'), za2),
322-
'ssrd': (('time', 'latitude', 'longitude'), za2),
323-
'tp': (('time', 'latitude', 'longitude'), za2),
324-
't2m': (('time', 'latitude', 'longitude'), za2),
325-
},
326-
attrs=ds.attrs
327-
)
328337

329-
chunks_plev = chunks
330-
chunks_surf = {'time':chunks.get('time'), 'latitude':chunks.get('latitude'),'longitude':chunks.get('longitude')}
338+
client = Client(n_workers=n_cores, threads_per_worker=1, dashboard_address=':8787')
339+
dwd = Path(path_to_netcdfs)
340+
surface_files = sorted(dwd.glob(surf_name))
341+
pressure_files = sorted(dwd.glob(plev_name))
331342

332-
#dd = dd.chunk(chunks=chunks).persist()
333-
# Suggested by leChat
334-
dd = dd.chunk(chunks=chunks)
335-
336-
vars = list(ds.keys())
337343
if compressor is None:
338344
compressor = BloscCodec(cname='lz4', clevel=5, shuffle='bitshuffle', blocksize=0)
339-
encoder = dict(zip(vars, [{'compressors': compressor}]*len(vars)))
340-
dd.to_zarr(zarrout, mode='w', zarr_format=3, encoding=encoder)
341-
dd.close()
342-
del dd
343-
ds.close()
344-
del ds
345-
346-
zarr_store = xr.open_zarr(str(pz))
347-
current_time_length = zarr_store.sizes['time']
348-
zarr_store.close()
349-
350-
if not parallelize:
351-
# loop through the years. This could be sent to multicore eventually
352-
for year in pd.to_datetime(tvec).year.unique():
353-
354-
# print(f"---> Appending PLEV year {year} to {pz.name}")
355-
# # Open the current year's NetCDF file
356-
# with xr.open_dataset(str(pn / (plev_name.replace('*', f'{year}'))), chunks='auto') as dp:
357-
# # Number of time steps in the current year
358-
# year_time_length = dp.sizes['time']
359-
#
360-
# # Compute start and end indices for the Zarr archive
361-
# start_idx = current_time_length
362-
# end_idx = start_idx + year_time_length
363-
#
364-
# # Append the data to the Zarr archive
365-
# dp.to_zarr(
366-
# str(pz),
367-
# mode='a',
368-
# region={'time': slice(start_idx, end_idx)},
369-
# append_dim='time',
370-
# align_chunks=True # Ensure chunks are aligned
371-
# )
372-
#
373-
# # Update the current time length for the next iteration
374-
# current_time_length = end_idx
375-
#
376-
#
377-
# print(f"---> Appending SURF year {year} to {pz.name}")
378-
# # Repeat for the SURF file
379-
# with xr.open_dataset(str(pn / (surf_name.replace('*', f'{year}'))), chunks='auto') as du:
380-
# du = du.rename({'z': 'z_surf'})
381-
# du.to_zarr(
382-
# str(pz),
383-
# mode='a',
384-
# region={'time': slice(start_idx, end_idx)},
385-
# append_dim='time',
386-
# align_chunks=True # Ensure chunks are aligned
387-
# )
388-
389-
390-
print(f"---> Appending PLEV year {year} to {pz.name}")
391-
dp = xr.open_dataset(str(pn / (plev_name.replace('*',f'{year}'))),chunks='auto')
392-
dp.to_zarr(str(pz), mode='a',region='auto', align_chunks=True)
393-
dp.close()
394-
del dp
395-
396-
print(f"---> Appending SURF year {year} to {pz.name}")
397-
du = xr.open_dataset(str(pn / (surf_name.replace('*',f'{year}'))), chunks='auto')
398-
du = du.rename({'z':'z_surf'})
399-
du.to_zarr(str(pz), mode='a',region='auto', align_chunks=True)
400-
du.close()
401-
del du
402-
else:
403-
flist = pn.glob(plev_name)
404-
fun_param = zip(list(np.unique(dd.time.dt.year.values).astype(int)), list(flist), [str(pz)*len(list(flist))], [str(False)*len(list(flist))])
405-
tu.multicore_pooling(append_nc_to_zarr_region, fun_param, n_cores)
406-
407-
flist = pn.glob(surf_name)
408-
fun_param = zip(list(np.unique(dd.time.dt.year.values).astype(int)), list(flist), [str(pz)*len(list(flist))], [str(True)*len(list(flist))])
409-
tu.multicore_pooling(append_nc_to_zarr_region, fun_param, n_cores)
410-
print("Conversion done :)")
345+
346+
surface_ds = xr.open_mfdataset(
347+
surface_files,
348+
combine="by_coords",
349+
parallel=True,
350+
chunks={"time": chuncks.get("time")} # Chunk time dimension to 1 year
351+
)
352+
353+
pressure_ds = xr.open_mfdataset(
354+
pressure_files,
355+
combine="by_coords",
356+
parallel=True,
357+
chunks={"time": chuncks.get("time")} # Chunk time dimension to 1 year
358+
)
359+
360+
# Rechunk spatial dimensions
361+
surface_ds = surface_ds.chunk({"latitude": chunks.get("latitude"), "longitude": chunks.get("longitude")})
362+
pressure_ds = pressure_ds.chunk({"latitude": chunks.get("latitude"), "longitude": chunks.get("longitude")})
363+
364+
combined_ds = xr.merge([surface_ds, pressure_ds], compat='override')
365+
encoding = {var: {"compressors": [compressor]} for var in combined_ds.data_vars}
366+
combined_ds.to_zarr(zarr_store, mode="w", encoding=encoding, compute=True)
367+
print(f"===> Combined NC files to Zarr store: {zarr_store}")
411368

412369

413370
def to_zarr(ds, fout, chuncks={'x':3, 'y':3, 'time':8760, 'level':7}, compressor=None, mode='w'):

0 commit comments

Comments
 (0)