|
| 1 | +# Retrieve data from the National Water Quality Assessment Program (NAWQA) |
| 2 | + |
| 3 | +import lithops |
| 4 | +import math |
| 5 | +import os |
| 6 | +import pandas as pd |
| 7 | + |
| 8 | +from random import randint |
| 9 | +from time import sleep |
| 10 | +from dataretrieval import nldi, nwis, wqp |
| 11 | + |
| 12 | +DESTINATION_BUCKET = os.environ.get('DESTINATION_BUCKET') |
| 13 | +PROJECT = "National Water Quality Assessment Program (NAWQA)" |
| 14 | +# some sites are not found in NLDI, avoid them for now |
| 15 | +NOT_FOUND_SITES = [ |
| 16 | + "15565447", # "USGS-" |
| 17 | + "15292700", |
| 18 | +] |
| 19 | +BAD_GEOMETRY_SITES = [ |
| 20 | + "06805500", |
| 21 | + "09306200", |
| 22 | +] |
| 23 | + |
| 24 | +BAD_NLDI_SITES = NOT_FOUND_SITES + BAD_GEOMETRY_SITES |
| 25 | + |
| 26 | + |
| 27 | +def map_retrieval(site): |
| 28 | + """Map function to pull data from NWIS and WQP""" |
| 29 | + print(f"Retrieving samples from site {site}") |
| 30 | + # skip bad sites |
| 31 | + if site in BAD_NLDI_SITES: |
| 32 | + site_list = [site] |
| 33 | + # else query slowly |
| 34 | + else: |
| 35 | + sleep(randint(0, 5)) |
| 36 | + site_list = find_neighboring_sites(site) |
| 37 | + |
| 38 | + # reformat for wqp |
| 39 | + site_list = [f"USGS-{site}" for site in site_list] |
| 40 | + |
| 41 | + df, _ = wqp_get_results(siteid=site_list, |
| 42 | + project=PROJECT, |
| 43 | + ) |
| 44 | + |
| 45 | + try: |
| 46 | + # merge sites |
| 47 | + df['MonitoringLocationIdentifier'] = f"USGS-{site}" |
| 48 | + df.astype(str).to_parquet(f's3://{DESTINATION_BUCKET}/nwqn-samples.parquet', |
| 49 | + engine='pyarrow', |
| 50 | + partition_cols=['MonitoringLocationIdentifier'], |
| 51 | + compression='zstd') |
| 52 | + # optionally, `return df` for further processing |
| 53 | + |
| 54 | + except Exception as e: |
| 55 | + print(f"No samples returned from site {site}: {e}") |
| 56 | + |
| 57 | + |
| 58 | +def exponential_backoff(max_retries=5, base_delay=1): |
| 59 | + """Exponential backoff decorator with configurable retries and base delay""" |
| 60 | + def decorator(func): |
| 61 | + def wrapper(*args, **kwargs): |
| 62 | + attempts = 0 |
| 63 | + while True: |
| 64 | + try: |
| 65 | + return func(*args, **kwargs) |
| 66 | + except Exception as e: |
| 67 | + attempts += 1 |
| 68 | + if attempts > max_retries: |
| 69 | + raise e |
| 70 | + wait_time = base_delay * (2 ** attempts) |
| 71 | + print(f"Retrying in {wait_time} seconds...") |
| 72 | + sleep(wait_time) |
| 73 | + return wrapper |
| 74 | + return decorator |
| 75 | + |
| 76 | + |
| 77 | +@exponential_backoff(max_retries=5, base_delay=1) |
| 78 | +def nwis_get_info(*args, **kwargs): |
| 79 | + return nwis.get_info(*args, **kwargs) |
| 80 | + |
| 81 | + |
| 82 | +@exponential_backoff(max_retries=5, base_delay=1) |
| 83 | +def wqp_get_results(*args, **kwargs): |
| 84 | + return wqp.get_results(*args, **kwargs) |
| 85 | + |
| 86 | + |
| 87 | +@exponential_backoff(max_retries=3, base_delay=1) |
| 88 | +def find_neighboring_sites(site, search_factor=0.1, fudge_factor=3.0): |
| 89 | + """Find sites upstream and downstream of the given site within a certain distance. |
| 90 | +
|
| 91 | + TODO Use geoconnex to determine mainstem length |
| 92 | +
|
| 93 | + Parameters |
| 94 | + ---------- |
| 95 | + site : str |
| 96 | + 8-digit site number. |
| 97 | + search_factor : float, optional |
| 98 | + The factor by which to multiply the watershed length to determine the |
| 99 | + search distance. |
| 100 | + fudge_factor : float, optional |
| 101 | + An additional fudge factor to apply to the search distance, because |
| 102 | + watersheds are not circular. |
| 103 | + """ |
| 104 | + site_df, _ = nwis_get_info(sites=site) |
| 105 | + drain_area_sq_mi = site_df["drain_area_va"].values[0] |
| 106 | + length = _estimate_watershed_length_km(drain_area_sq_mi) |
| 107 | + search_distance = length * search_factor * fudge_factor |
| 108 | + # clip between 1 and 9999km |
| 109 | + search_distance = max(1.0, min(9999.0, search_distance)) |
| 110 | + |
| 111 | + # get upstream and downstream sites |
| 112 | + gdfs = [ |
| 113 | + nldi.get_features( |
| 114 | + feature_source="WQP", |
| 115 | + feature_id=f"USGS-{site}", |
| 116 | + navigation_mode=mode, |
| 117 | + distance=search_distance, |
| 118 | + data_source="nwissite", |
| 119 | + ) |
| 120 | + for mode in ["UM", "DM"] # upstream and downstream |
| 121 | + ] |
| 122 | + |
| 123 | + features = pd.concat(gdfs, ignore_index=True) |
| 124 | + |
| 125 | + df, _ = nwis_get_info(sites=list(features.identifier.str.strip('USGS-'))) |
| 126 | + # drop sites with disimilar different drainage areas |
| 127 | + df = df.where( |
| 128 | + (df["drain_area_va"] / drain_area_sq_mi) > search_factor, |
| 129 | + ).dropna(how="all") |
| 130 | + |
| 131 | + site_list = df["site_no"].to_list() |
| 132 | + |
| 133 | + # include the original search site among the neighbors |
| 134 | + if site not in site_list: |
| 135 | + site_list.append(site) |
| 136 | + |
| 137 | + return site_list |
| 138 | + |
| 139 | + |
| 140 | +def _estimate_watershed_length_km(drain_area_sq_mi): |
| 141 | + """Estimate the diameter assuming a circular watershed. |
| 142 | +
|
| 143 | + Parameters |
| 144 | + ---------- |
| 145 | + drain_area_sq_mi : float |
| 146 | + The drainage area in square miles. |
| 147 | +
|
| 148 | + Returns |
| 149 | + ------- |
| 150 | + float |
| 151 | + The diameter of the watershed in kilometers. |
| 152 | + """ |
| 153 | + # assume a circular watershed |
| 154 | + length_miles = 2 * (drain_area_sq_mi / math.pi) ** 0.5 |
| 155 | + # convert from miles to km |
| 156 | + return length_miles * 1.60934 |
| 157 | + |
| 158 | + |
| 159 | +if __name__ == "__main__": |
| 160 | + project = "National Water Quality Assessment Program (NAWQA)" |
| 161 | + |
| 162 | + site_df = pd.read_csv( |
| 163 | + 'NWQN_sites.csv', |
| 164 | + comment='#', |
| 165 | + dtype={'SITE_QW_ID': str, 'SITE_FLOW_ID': str}, |
| 166 | + ) |
| 167 | + |
| 168 | + site_list = site_df['SITE_QW_ID'].to_list() |
| 169 | + #site_list = site_list[:2] # prune for testing |
| 170 | + |
| 171 | + fexec = lithops.FunctionExecutor(config_file="lithops.yaml") |
| 172 | + futures = fexec.map(map_retrieval, site_list) |
| 173 | + |
| 174 | + futures.get_result() |
0 commit comments