Skip to content

Commit c9fbab8

Browse files
Add NAQWA data pull demo (#151)
Add demo using lithops to query NAQWA data. --------- Co-authored-by: thodson <thodson@usgs.gov>
1 parent d3865a2 commit c9fbab8

5 files changed

Lines changed: 237 additions & 0 deletions

File tree

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Python 3.11
2+
FROM python:3.11-slim-buster
3+
4+
5+
RUN apt-get update \
6+
# Install aws-lambda-cpp build dependencies
7+
&& apt-get install -y \
8+
g++ \
9+
make \
10+
cmake \
11+
unzip \
12+
# cleanup package lists, they are not used anymore in this image
13+
&& rm -rf /var/lib/apt/lists/* \
14+
&& apt-cache search linux-headers-generic
15+
16+
ARG FUNCTION_DIR="/function"
17+
18+
# Copy function code
19+
RUN mkdir -p ${FUNCTION_DIR}
20+
21+
# Update pip
22+
# NB botocore/boto3 are pinned due to https://github.com/boto/boto3/issues/3648
23+
# using versions from https://github.com/aio-libs/aiobotocore/blob/72b8dd5d7d4ef2f1a49a0ae0c37b47e5280e2070/setup.py
24+
# due to s3fs dependency
25+
RUN pip install --upgrade --ignore-installed pip wheel six setuptools \
26+
&& pip install --upgrade --no-cache-dir --ignore-installed \
27+
awslambdaric \
28+
botocore==1.29.76 \
29+
boto3==1.26.76 \
30+
redis \
31+
httplib2 \
32+
requests \
33+
numpy \
34+
scipy \
35+
pandas \
36+
pika \
37+
kafka-python \
38+
cloudpickle \
39+
ps-mem \
40+
tblib
41+
42+
# Set working directory to function root directory
43+
WORKDIR ${FUNCTION_DIR}
44+
45+
# Add Lithops
46+
COPY lithops_lambda.zip ${FUNCTION_DIR}
47+
RUN unzip lithops_lambda.zip \
48+
&& rm lithops_lambda.zip \
49+
&& mkdir handler \
50+
&& touch handler/__init__.py \
51+
&& mv entry_point.py handler/
52+
53+
# Put your dependencies here, using RUN pip install... or RUN apt install...
54+
55+
COPY requirements.txt requirements.txt
56+
RUN pip install --no-cache-dir -r requirements.txt
57+
58+
ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ]
59+
CMD [ "handler.entry_point.lambda_handler" ]

demos/nawqa_data_pull/README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Retrieva data from the National Water Quality Assessment Program (NAWQA)
2+
3+
This examples walks through using lithops to retrieve data from every NAWQA
4+
monitoring site, then writes the results to a parquet files on s3. Each
5+
retrieval also searches the NLDI for neighboring sites with NAWQA data and
6+
merges those data assuming the monitoring site was relocated.
7+
8+
1. Set up a Python environment
9+
```bash
10+
conda create --name dataretrieval-lithops -y python=3.11
11+
conda activate dataretrieval-lithops
12+
pip install -r requirements.txt
13+
```
14+
15+
1. Configure compute and storage backends for [lithops](https://lithops-cloud.github.io/docs/source/configuration.html).
16+
The configuration in `lithops.yaml` uses AWS Lambda for [compute](https://lithops-cloud.github.io/docs/source/compute_config/aws_lambda.html) and AWS S3 for [storage](https://lithops-cloud.github.io/docs/source/storage_config/aws_s3.html).
17+
To use those backends, simply edit `lithops.yaml` with your `bucket` and `execution_role`.
18+
19+
1. Build a runtime image for Cubed
20+
```bash
21+
export LITHOPS_CONFIG_FILE=$(pwd)/lithops.yaml
22+
lithops runtime build -b aws_lambda -f Dockerfile_dataretrieval dataretrieval-runtime
23+
```
24+
25+
1. Download site list
26+
```bash
27+
wget https://www.sciencebase.gov/catalog/file/get/655d2063d34ee4b6e05cc9e6?f=__disk__b3%2F3e%2F5b%2Fb33e5b0038f004c2a48818d0fcc88a0921f3f689 -O NWQN_sites.csv
28+
```
29+
30+
1. Create a s3 bucket for the output, then set it as an environmental variable
31+
```bash
32+
export DESTINATION_BUCKET=<path/to/bucket>
33+
```
34+
35+
1. Run the script
36+
```bash
37+
python retrieve_nawqa_with_lithops.py
38+
```
39+
40+
## Cleaning up
41+
To rebuild the Litops image, delete the existing one by running
42+
```bash
43+
lithops runtime delete -b aws_lambda -d dataretrieval-runtime
44+
```

demos/nawqa_data_pull/lithops.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
lithops:
2+
backend: aws_lambda
3+
storage: aws_s3
4+
5+
aws:
6+
region: us-west-2
7+
8+
aws_lambda:
9+
execution_role: arn:aws:iam::807615458658:role/lambdaLithopsExecutionRole
10+
runtime: dataretrieval-runtime
11+
runtime_memory: 2000
12+
13+
aws_s3:
14+
bucket: arn:aws:s3:::cubed-thodson-temp
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
boto3
2+
dataretrieval[nldi]
3+
lithops
4+
pika
5+
ps_mem
6+
pyarrow
7+
s3fs
8+
tblib
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# Retrieve data from the National Water Quality Assessment Program (NAWQA)
2+
3+
import lithops
4+
import math
5+
import os
6+
import pandas as pd
7+
8+
from dataretrieval import nldi, nwis, wqp
9+
10+
DESTINATION_BUCKET = os.environ.get('DESTINATION_BUCKET')
11+
PROJECT = "National Water Quality Assessment Program (NAWQA)"
12+
13+
14+
def map_retrieval(site):
15+
"""Map function to pull data from NWIS and WQP"""
16+
site_list = find_neighboring_sites(site)
17+
# reformat for wqp
18+
site_list = [f"USGS-{site}" for site in site_list]
19+
20+
df, _ = wqp.get_results(siteid=site_list,
21+
project=PROJECT,
22+
)
23+
24+
# merge sites
25+
df['MonitoringLocationIdentifier'] = f"USGS-{site}"
26+
27+
if len(df) != 0:
28+
df.astype(str).to_parquet(f's3://{DESTINATION_BUCKET}/nwqn-samples.parquet',
29+
engine='pyarrow',
30+
partition_cols=['MonitoringLocationIdentifier'],
31+
compression='zstd')
32+
# optionally, `return df` for further processing
33+
34+
35+
def find_neighboring_sites(site, search_factor=0.05):
36+
"""Find sites upstream and downstream of the given site within a certain distance.
37+
38+
Parameters
39+
----------
40+
site : str
41+
8-digit site number.
42+
search_factor : float, optional
43+
"""
44+
site_df, _ = nwis.get_info(sites=site)
45+
drain_area_sq_mi = site_df["drain_area_va"].values[0]
46+
length = _estimate_watershed_length_km(drain_area_sq_mi)
47+
search_distance = length * search_factor
48+
# clip between 1 and 9999km
49+
search_distance = max(1.0, min(9999.0, search_distance))
50+
51+
upstream_gdf = nldi.get_features(
52+
feature_source="WQP",
53+
feature_id=f"USGS-{site}",
54+
navigation_mode="UM",
55+
distance=search_distance,
56+
data_source="nwissite",
57+
)
58+
59+
downstream_gdf = nldi.get_features(
60+
feature_source="WQP",
61+
feature_id=f"USGS-{site}",
62+
navigation_mode="DM",
63+
distance=search_distance,
64+
data_source="nwissite",
65+
)
66+
67+
features = pd.concat([upstream_gdf, downstream_gdf], ignore_index=True)
68+
69+
df, _ = nwis.get_info(sites=list(features.identifier.str.strip('USGS-')))
70+
# drop sites with disimilar different drainage areas
71+
df = df.where(
72+
(df["drain_area_va"] / drain_area_sq_mi) > search_factor,
73+
).dropna(how="all")
74+
75+
return df["site_no"].to_list()
76+
77+
78+
def _estimate_watershed_length_km(drain_area_sq_mi):
79+
"""Estimate the diameter assuming a circular watershed.
80+
81+
Parameters
82+
----------
83+
drain_area_sq_mi : float
84+
The drainage area in square miles.
85+
86+
Returns
87+
-------
88+
float
89+
The diameter of the watershed in kilometers.
90+
"""
91+
# assume a circular watershed
92+
length_miles = 2 * (drain_area_sq_mi / math.pi) ** 0.5
93+
# convert to km
94+
return length_miles * 1.60934
95+
96+
97+
if __name__ == "__main__":
98+
project = "National Water Quality Assessment Program (NAWQA)"
99+
100+
site_df = pd.read_csv(
101+
'NWQN_sites.csv',
102+
comment='#',
103+
dtype={'SITE_QW_ID': str, 'SITE_FLOW_ID': str},
104+
)
105+
106+
site_list = site_df['SITE_QW_ID'].to_list()
107+
# site_list = site_list[:4] # prune for testing
108+
109+
fexec = lithops.FunctionExecutor(config_file="lithops.yaml")
110+
futures = fexec.map(map_retrieval, site_list)
111+
112+
futures.get_result()

0 commit comments

Comments
 (0)