-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutils.py
More file actions
162 lines (127 loc) · 4.87 KB
/
utils.py
File metadata and controls
162 lines (127 loc) · 4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
Utilities
"""
from typing import Hashable, Iterable, Iterator, Optional
import numpy as np
import xarray as xr
from xarray import DataArray, Dataset
NEMO_NONE = 999_999
def _is_nemo_none(var: Hashable) -> bool:
"""Assess if a NEMO parameter is None"""
return (var or NEMO_NONE) == NEMO_NONE
def _are_nemo_none(var: Iterable) -> Iterator[bool]:
"""Iterate over namelist parameters and assess if they are None"""
for v in var:
yield _is_nemo_none(v)
# -----------------------------------------------------------------------------
def _calc_rmax(depth: DataArray) -> float:
"""
Calculate rmax: measure of steepness
This function returns the maximum slope paramater
rmax = abs(Hb - Ha) / (Ha + Hb)
where Ha and Hb are the depths of adjacent grid cells (Mellor et al 1998).
Reference:
*) Mellor, Oey & Ezer, J Atm. Oce. Tech. 15(5):1122-1131, 1998.
Parameters
----------
depth: DataArray
Bottom depth (units: m).
Returns
-------
float
Maximum slope parameter (units: None)
"""
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# Do we actually need this? mypy complains since
# DataArray.reset_indexe() returns Optional["DataArray"]
#
# depth = depth.reset_index(list(depth.dims))
both_rmax = []
for dim in depth.dims:
# (Hb - Ha) / (Ha + Hb)
depth_diff = depth.diff(dim)
depth_rolling_sum = depth.rolling({dim: 2}).sum().dropna(dim)
rmax = depth_diff / depth_rolling_sum
# (rmax_a + rmax_b) / 2
rmax = rmax.rolling({dim: 2}).mean().dropna(dim)
# Fill first row and column
rmax = rmax.pad({dim: (1, 1)}, constant_values=0)
both_rmax.append(np.abs(rmax))
return np.maximum(*both_rmax)
# -----------------------------------------------------------------------------
def generate_cartesian_grid(
ppe1_m,
ppe2_m,
jpiglo: Optional[int] = None,
jpjglo: Optional[int] = None,
ppglam0: float = 0,
ppgphi0: float = 0,
chunks: Optional[dict] = None,
) -> Dataset:
"""
Generate coordinates and spacing of a NEMO Cartesian grid.
Parameters
----------
ppe1_m, ppe2_m: float, 1D array-like
Grid spacing along x/y axis (units: m).
jpiglo, jpjglo: int, optional
Size of x/y dimension.
ppglam0, ppgphi0: float
x/y coordinate of first T-point (units: m).
chunks: dict, optional
Chunk sizes along each dimension (e.g., ``{"x": 5, "y": 5}``).
Requires ``dask`` installed.
Returns
-------
Dataset
Equivalent of NEMO coordinates file.
Raises
------
ValueError
If ``ppe{1,2}_m`` is a vector and ``jp{i,j}glo`` is specified, or viceversa.
Notes
-----
Vectors are loaded into memory. If ``chunks`` is specified, 2D arrays are coerced
into dask arrays before broadcasting.
"""
ds = Dataset()
for dim, ppe, jp, ppg in zip(
["x", "y"], [ppe1_m, ppe2_m], [jpiglo, jpjglo], [ppglam0, ppgphi0]
):
# Check and convert ppe to numpy array
ppe = np.asarray(ppe, dtype=float)
if (ppe.shape and jp) or (not ppe.shape and not jp):
raise ValueError(
"`jp{i,j}glo` must be specified"
" if and only if `ppe{1,2}_m` is not a vector."
)
# c: center f:face
delta_c = DataArray(ppe if ppe.shape else ppe.repeat(jp), dims=dim)
coord_f = delta_c.cumsum(dim) + (ppg - 0.5 * delta_c[0])
coord_c = coord_f.rolling({dim: 2}).mean().fillna(ppg)
delta_f = coord_c.diff(dim).pad({dim: (0, 1)}, constant_values=delta_c[-1])
# Add attributes
for da in [coord_c, coord_f]:
da.attrs = dict(
units="m", long_name=f"{dim}-coordinate in Cartesian system"
)
for da in [delta_c, delta_f]:
da.attrs = dict(units="m", long_name=f"{dim}-axis spacing")
# Fill dataset
eprefix = "e" + ("1" if dim == "x" else "2")
gprefix = "g" + ("lam" if dim == "x" else "phi")
nav_coord = "nav_" + ("lon" if dim == "x" else "lat")
vel_c = "v" if dim == "x" else "u"
vel_f = "v" if dim == "y" else "u"
ds[nav_coord] = ds[gprefix + "t"] = ds[gprefix + vel_c] = coord_c
ds[gprefix + "f"] = ds[gprefix + vel_f] = coord_f
ds[eprefix + "t"] = ds[eprefix + vel_c] = delta_c
ds[eprefix + "f"] = ds[eprefix + vel_f] = delta_f
# Upgrade dimension to coordinate so we can add CF-attributes
ds[dim] = ds[dim]
ds[dim].attrs = dict(axis=dim.upper(), long_name=f"{dim}-dimension index")
# Generate 2D coordinates (create dask arrays before broadcasting).
# Order dims (y, x) for convenience (e.g., for plotting).
(ds,) = xr.broadcast(ds if chunks is None else ds.chunk(chunks))
ds = ds.transpose(*("y", "x"))
return ds.set_coords(ds.variables)