Skip to content

Commit 41de5eb

Browse files
committed
Small bench for dask
1 parent b3fc247 commit 41de5eb

1 file changed

Lines changed: 204 additions & 0 deletions

File tree

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
#######################################################################
2+
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
3+
# All rights reserved.
4+
#
5+
# This source code is licensed under a BSD-style license (found in the
6+
# LICENSE file in the root directory of this source tree)
7+
#######################################################################
8+
9+
# Compute reductions for different array sizes, using the jit decorator
10+
# and different operands (NumPy and NDArray). Different compression
11+
# levels and codecs can be selected.
12+
13+
from time import time
14+
import blosc2
15+
import numpy as np
16+
import sys
17+
import dask
18+
import dask.array as da
19+
import zarr
20+
from numcodecs import Blosc
21+
22+
niter = 5
23+
#dtype = np.dtype("float32")
24+
dtype = np.dtype("float64")
25+
clevel = 1
26+
numpy = False
27+
numpy_jit = False
28+
dask_da = False
29+
cparams = cparams_out = None
30+
check_result = False
31+
32+
# For 64 GB RAM
33+
# sizes_numpy = (1, 5, 10, 20, 30, 35, 40, 45, 50, 55)
34+
# sizes_numpy_jit = (1, 5, 10, 20, 30, 35, 40, 45, 50, 55, 60, 65, 70)
35+
# sizes_clevel0 = (1, 5, 10, 20, 30, 35, 40, 45, 50, 55, 60, 65, 70)
36+
# size_list = (1, 5, 10, 20, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110) # limit clevel>=1 float64
37+
38+
# For 24 GB RAM
39+
sizes_numpy = (1, 5, 10, 20, 30) # limit numpy float64
40+
sizes_numpy_jit = (1, 5, 10, 20, 30) # limit numpy float64
41+
sizes_clevel0 = (1, 5, 10, 20, 30) # limit clevel==0 float64
42+
size_list = (1, 5, 10, 20, 30)
43+
44+
codec = "LZ4" # default codec
45+
if len(sys.argv) > 2:
46+
codec = sys.argv[2]
47+
if len(sys.argv) > 1:
48+
try:
49+
clevel = int(sys.argv[1])
50+
except ValueError:
51+
clevel = 0
52+
if sys.argv[1] == "numpy":
53+
numpy = True
54+
elif sys.argv[1] == "numpy_jit":
55+
numpy = True
56+
numpy_jit = True
57+
else:
58+
raise ValueError("Invalid argument")
59+
60+
if check_result:
61+
print("*** Enabling check_result: beware that this will slow down the benchmarking!")
62+
63+
if len(sys.argv) > 3:
64+
if sys.argv[3] == "dask":
65+
dask_da = True
66+
67+
68+
# The reductions to compute
69+
def compute_reduction_numpy(a, b, c):
70+
return np.sum(((a ** 3 + np.sin(a * 2)) < c) & (b > 0), axis=1)
71+
72+
@blosc2.jit
73+
def compute_reduction(a, b, c):
74+
return np.sum(((a ** 3 + np.sin(a * 2)) < c) & (b > 0), axis=1)
75+
76+
def compute_reduction_dask(a, b, c):
77+
return (((a ** 3 + da.sin(a * 2)) < c) & (b > 0)).sum(axis=1)
78+
79+
80+
# Compute for both disk or memory
81+
#for disk in (True, False):
82+
for disk in (False,):
83+
if disk and (numpy or numpy_jit or dask_da):
84+
continue
85+
print(f"\n*** Using disk={disk} ***\n")
86+
apath = bpath = None
87+
if numpy:
88+
print("Using NumPy arrays as operands")
89+
else:
90+
print("Using NDArray arrays as operands")
91+
cparams = cparams_out = blosc2.CParams(clevel=clevel, codec=blosc2.Codec[codec])
92+
# zcodecs = zcodecs_out = zarr.codecs.BloscCodec(
93+
# cname=codec.lower(), clevel=clevel, shuffle=zarr.codecs.BloscShuffle.shuffle)
94+
zcompressor = zcompressor_out = Blosc(cname=codec.lower(), clevel=clevel, shuffle=Blosc.SHUFFLE)
95+
# cparams_out = blosc2.CParams(clevel=clevel, codec=blosc2.Codec.LZ4)
96+
print("Using cparams: ", cparams)
97+
if disk:
98+
apath = "a.b2nd"
99+
bpath = "b.b2nd"
100+
101+
create_times = []
102+
compute_times = []
103+
# Iterate over different sizes
104+
for n in size_list:
105+
if clevel == 0 and n not in sizes_clevel0:
106+
continue
107+
if numpy_jit and n not in sizes_numpy_jit:
108+
continue
109+
if numpy and not numpy_jit and n not in sizes_numpy:
110+
continue
111+
N = n * 1000
112+
print(f"\nN = {n}000, {dtype=}, size={N ** 2 * 2 * dtype.itemsize / 2**30:.3f} GB")
113+
chunks = (100, N)
114+
blocks = (1, N)
115+
#chunks, blocks = None, None # automatic chunk and block sizes
116+
# Lossy compression
117+
#filters = [blosc2.Filter.TRUNC_PREC, blosc2.Filter.SHUFFLE]
118+
#filters_meta = [8, 0] # keep 8 bits of precision in mantissa
119+
#cparams = blosc2.CParams(clevel=1, codec=blosc2.Codec.LZ4, filters=filters, filters_meta=filters_meta)
120+
121+
# Create some data operands
122+
if check_result or dask_da:
123+
na = np.linspace(0, 1, N * N, dtype=dtype).reshape(N, N)
124+
nb = na + 1
125+
nc = np.linspace(-10, 10, N, dtype=dtype)
126+
nout = compute_reduction_numpy(na, nb, nc)
127+
t0 = time()
128+
if numpy or numpy_jit:
129+
na = np.linspace(0, 1, N * N, dtype=dtype).reshape(N, N)
130+
nb = na + 1
131+
nc = np.linspace(-10, 10, N, dtype=dtype)
132+
elif dask_da:
133+
# Use zarr for operands
134+
za = zarr.array(na, chunks=chunks, compressor=zcompressor, zarr_format=2)
135+
zb = zarr.array(nb, chunks=chunks, compressor=zcompressor, zarr_format=2)
136+
zc = zarr.array(nc, chunks=chunks[1], compressor=zcompressor, zarr_format=2)
137+
else:
138+
a = blosc2.linspace(0, 1, N * N, dtype=dtype, shape=(N, N), cparams=cparams, urlpath=apath, mode="w")
139+
#print("a.chunks, a.blocks, a.schunk.cratio: ", a.chunks, a.blocks, a.schunk.cratio)
140+
print(f"{a.chunks=}, {a.blocks=}, {a.schunk.cratio=:.2f}x")
141+
142+
b = blosc2.linspace(1, 2, N * N, dtype=dtype, shape=(N, N), cparams=cparams, urlpath=bpath, mode="w")
143+
#b = (a + 1).compute(cparams=cparams, chunks=chunks, blocks=blocks)
144+
#print(b.chunks, b.blocks, b.schunk.cratio, b.cparams)
145+
c = blosc2.linspace(-10, 10, N, dtype=dtype, cparams=cparams) # broadcasting is supported
146+
#c = blosc2.linspace(-10, 10, N * N, dtype=dtype, shape=(N, N), cparams=cparams)
147+
t1 = time() - t0
148+
print(f"Time to create data: {t1:.4f}")
149+
create_times.append(t1)
150+
151+
if numpy and not dask_da:
152+
if numpy_jit and not numpy:
153+
out = compute_reduction(na, nb, nc)
154+
t0 = time()
155+
for i in range(niter):
156+
out = compute_reduction(na, nb, nc)
157+
t1 = (time() - t0) / niter
158+
print(f"Time to compute with numpy_jit and NumPy operands: {t1:.4f}")
159+
else:
160+
t0 = time()
161+
nout = compute_reduction_numpy(na, nb, nc)
162+
t1 = time() - t0
163+
print(f"Time to compute with NumPy engine: {t1:.4f}")
164+
elif dask_da:
165+
niter = 1
166+
if numpy:
167+
a = na
168+
b = nb
169+
c = nc
170+
else:
171+
a = da.from_zarr(za)
172+
b = da.from_zarr(zb)
173+
c = da.from_zarr(zc)
174+
175+
scheduler = "single-threaded" if blosc2.nthreads == 1 else "threads"
176+
t0 = time()
177+
for i in range(niter):
178+
if numpy:
179+
dexpr = da.map_blocks(compute_reduction_dask, a, b, c)
180+
out = dexpr.compute(scheduler=scheduler)
181+
else:
182+
dexpr = (((a ** 3 + da.sin(a * 2)) < c) & (b > 0)).sum(axis=1)
183+
zout = zarr.open(shape=(N,), chunks=chunks[1], dtype=dtype, compressor=zcompressor_out, zarr_format=2)
184+
with dask.config.set(scheduler=scheduler, num_workers=blosc2.nthreads):
185+
da.to_zarr(dexpr, zout)
186+
if check_result and i == 0:
187+
out = zout[:]
188+
t1 = (time() - t0) / niter
189+
print(f"Time to compute with dask and {clevel=}: {t1:.4f}")
190+
if check_result:
191+
np.testing.assert_allclose(out, nout)
192+
else:
193+
# out = compute_reduction(a, b, c)
194+
t0 = time()
195+
for i in range(niter):
196+
out = compute_reduction(a, b, c)
197+
t1 = (time() - t0) / niter
198+
print(f"Time to compute with blosc2_jit and {clevel=}: {t1:.4f}")
199+
compute_times.append(t1)
200+
#del a, b, c
201+
202+
print("\nCreate times: [", ", ".join([f"{t:.4f}" for t in create_times]), "]")
203+
print("Compute times: [", ", ".join([f"{t:.4f}" for t in compute_times]), "]")
204+
print("End of run!\n\n")

0 commit comments

Comments
 (0)