Skip to content

Commit 0668bb8

Browse files
committed
feat/spark3.5: Spark History Server / fixes worker links on web-ui (#172)
* Add Spark History Server to the Spark Standalone cluster * Configure eventLog for Spark Master and Workers * Set `SPARK_PUBLIC_DNS=localhost` so Web UI worker links are accessible from the host * Pin PySpark to 3.5.7 to prevent version mismatches between Driver and Workers * Move Jupyter notebooks from `pyspark-4.x/` to `module5-batch-processing/` * Major refactor of `fhv_zones_gcs.py` * Update README with spark-submit instructions * Drop unused `compose.yml` on root dir
1 parent 92eee01 commit 0668bb8

9 files changed

Lines changed: 91 additions & 47 deletions

module5-batch-processing/compose.spark-3.5-standalone.yaml

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ x-spark-common:
55
image: *spark-image
66
environment:
77
&spark-common-env
8-
SPARK_NO_DAEMONIZE: true # Forces the process to run in foreground (req. for Docker)
8+
SPARK_NO_DAEMONIZE: true # Forces the process to run in foreground (req. for Docker)
9+
SPARK_PUBLIC_DNS: localhost # Ensures Web UI links point to localhost instead of container IPs
10+
GOOGLE_APPLICATION_CREDENTIALS: "/secrets/gcp_credentials.json"
911
volumes:
1012
&spark-common-vol
11-
- vol-spark-extra-jars:/opt/spark/extra-jars/
13+
- ./logs/:/opt/spark/logs/
1214
- ./spark-3.5-standalone.conf:/opt/spark/conf/spark-standalone.conf
1315
- ~/.gcp/spark_credentials.json:/secrets/gcp_credentials.json
16+
- vol-spark-extra-jars:/opt/spark/extra-jars/
1417
depends_on:
1518
&spark-common-depends-on
1619
spark-init:
@@ -77,7 +80,24 @@ services:
7780
depends_on:
7881
spark-master:
7982
condition: service_started
80-
restart: on-failure:3
83+
restart: on-failure:5
84+
85+
spark-history-server:
86+
<<: *spark-common
87+
container_name: spark-history-server
88+
command: |
89+
/opt/spark/sbin/start-history-server.sh
90+
--properties-file /opt/spark/conf/spark-standalone.conf
91+
environment:
92+
<<: *spark-common-env
93+
SPARK_HISTORY_OPTS: >-
94+
-Dspark.history.fs.logDirectory=/opt/spark/logs/
95+
ports:
96+
- '18080:18080'
97+
depends_on:
98+
spark-master:
99+
condition: service_started
100+
restart: on-failure:5
81101

82102
spark-init:
83103
image: *spark-image
@@ -89,10 +109,10 @@ services:
89109
- |
90110
apt-get update && apt-get install curl -y
91111
curl --create-dirs -O --output-dir /opt/spark/extra-jars/ https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.32/gcs-connector-hadoop3-2.2.32-shaded.jar
112+
chown -R 185:185 /opt/spark/extra-jars/
92113
volumes:
93114
- vol-spark-extra-jars:/opt/spark/extra-jars/
94115

95-
96116
volumes:
97117
vol-spark-extra-jars:
98-
name: vol-spark-extra-jars
118+
name: vol-spark-extra-jars

module5-batch-processing/compose.yaml

Lines changed: 0 additions & 1 deletion
This file was deleted.

module5-batch-processing/pyspark-4.x/notebooks/pyspark_connect_dataframe-api_gcs.ipynb renamed to module5-batch-processing/notebooks/pyspark_connect_dataframe-api_gcs.ipynb

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@
9191
"| B00037|2019-10-01 00:08:12|2019-10-01 00:28:47| 264| 198| NULL| B00037|\n",
9292
"| B00053|2019-10-01 00:05:24|2019-10-01 00:53:03| 264| 264| NULL| NULL|\n",
9393
"+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+\n",
94-
"only showing top 20 rows\n"
94+
"only showing top 20 rows\n",
95+
"\n"
9596
]
9697
}
9798
],
@@ -213,7 +214,8 @@
213214
"| 19| Queens| Bellerose| Boro Zone|\n",
214215
"| 20| Bronx| Belmont| Boro Zone|\n",
215216
"+----------+-------------+--------------------+------------+\n",
216-
"only showing top 20 rows\n"
217+
"only showing top 20 rows\n",
218+
"\n"
217219
]
218220
}
219221
],
@@ -283,7 +285,7 @@
283285
"fhv.coalesce(6) \\\n",
284286
" .write \\\n",
285287
" .mode(\"overwrite\") \\\n",
286-
" .parquet(\"gs://iobruno-lakehouse-raw/tmp/spark-connect-dataframe-api/\")"
288+
" .parquet(\"gs://iobruno-lakehouse-raw/spark-connect/jupyter-dataframe-api/\")"
287289
]
288290
},
289291
{
@@ -411,7 +413,7 @@
411413
"name": "stderr",
412414
"output_type": "stream",
413415
"text": [
414-
"/Users/iobruno/Vault/data-engineering-labs/module5-batch-processing/pyspark-4.x/.venv/lib/python3.13/site-packages/pyspark/sql/connect/expressions.py:1091: UserWarning: WARN WindowExpression: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.\n",
416+
"/Users/iobruno/Vault/data-engineering-labs/module5-batch-processing/pyspark-3.x/.venv/lib/python3.13/site-packages/pyspark/sql/connect/expressions.py:948: UserWarning: WARN WindowExpression: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.\n",
415417
" warnings.warn(\n"
416418
]
417419
},

module5-batch-processing/pyspark-4.x/notebooks/pyspark_connect_sparksql_gcs.ipynb renamed to module5-batch-processing/notebooks/pyspark_connect_sparksql_gcs.ipynb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@
9898
"| B00037|2019-10-01 00:08:12|2019-10-01 00:28:47| 264| 198| NULL| B00037|\n",
9999
"| B00053|2019-10-01 00:05:24|2019-10-01 00:53:03| 264| 264| NULL| NULL|\n",
100100
"+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+\n",
101-
"only showing top 20 rows\n"
101+
"only showing top 20 rows\n",
102+
"\n"
102103
]
103104
}
104105
],
@@ -236,7 +237,8 @@
236237
"| 19| Queens| Bellerose| Boro Zone|\n",
237238
"| 20| Bronx| Belmont| Boro Zone|\n",
238239
"+----------+-------------+--------------------+------------+\n",
239-
"only showing top 20 rows\n"
240+
"only showing top 20 rows\n",
241+
"\n"
240242
]
241243
}
242244
],
@@ -334,7 +336,7 @@
334336
"df.coalesce(6)\\\n",
335337
" .write\\\n",
336338
" .mode(\"overwrite\")\\\n",
337-
" .parquet(\"gs://iobruno-lakehouse-raw/tmp/spark-connect-sparksql/\")"
339+
" .parquet(\"gs://iobruno-lakehouse-raw/spark-connect/jupyter-sparksql/\")"
338340
]
339341
},
340342
{

module5-batch-processing/pyspark-3.x/README.md

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,32 @@ pre-commit install
3333

3434
**4.** Spin up the Spark Cluster with:
3535
```shell
36-
docker compose -f ../compose.yaml up -d
36+
docker compose -f ../compose.spark-3.5-standalone.yaml up -d
3737
```
3838

39+
**5.** Spark Web UI
40+
- Spark Master Web UI can be accessed at [http://localhost:4040](http://localhost:4040)
41+
- Spark History Server can be accessed at [http://localhost:18080](http://localhost:18080)
42+
43+
44+
## Spark-submit Application
45+
46+
### Local (Spark Driver running on local machine)
47+
48+
With `--deploy-mode client` (default), the Spark Driver runs locally and doesn't pick up [spark-3.5-standalone.conf](../compose.spark-3.5-standalone.yaml), so the `--conf spark.hadoop.*` options must be set explicitly.
49+
50+
```shell
51+
spark-submit \
52+
--master spark://localhost:7077 \
53+
--packages "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.32" \
54+
--conf spark.eventLog.enabled=true \
55+
--conf spark.eventLog.dir=file://$(pwd)/../logs/ \
56+
--conf spark.driver.userClassPathFirst=true \
57+
--conf spark.executor.userClassPathFirst=true \
58+
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem \
59+
--conf spark.hadoop.fs.AbstractFileSystem.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS \
60+
fhv_zones_gcs.py
61+
```
3962

4063
## Compatibility Matrix
4164

module5-batch-processing/pyspark-4.x/fhv_zones_gcs.py renamed to module5-batch-processing/pyspark-3.x/fhv_zones_gcs.py

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from os import environ as env
21
from os import getenv
32

43
from pyspark.sql import DataFrame, SparkSession
@@ -47,41 +46,35 @@ def join_dfs_with_spark_sql(spark: SparkSession) -> DataFrame:
4746
)
4847

4948

50-
def config_spark_session(name: str, master: str = "local[*]") -> SparkSession:
51-
spark = (
49+
def get_spark_session() -> SparkSession:
50+
return (
5251
SparkSession.builder
5352
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
5453
.config("spark.driver.memory", "2g")
55-
.config("spark.executor.memory", "8g")
54+
.config("spark.executor.memory", "4g")
5655
.config("spark.cores.max", 8)
57-
.appName(name)
58-
.master(master)
56+
.appName("pyspark-3.5-pipeline")
5957
.getOrCreate()
6058
)
61-
spark._jsc.hadoopConfiguration().set(
62-
"google.cloud.auth.service.account.json.keyfile", env["GOOGLE_APPLICATION_CREDENTIALS"]
63-
)
64-
return spark
65-
6659

6760
def main():
68-
spark_master = getenv(key="SPARK_MASTER", default="local[*]")
69-
spark = config_spark_session(name="pyspark-playground", master=spark_master)
61+
spark = get_spark_session()
62+
logger = spark._jvm.org.apache.log4j.LogManager.getLogger(__name__)
7063

7164
fhv_gcs_path = getenv(
7265
key="FHV_GCS_PATH",
73-
default="gs://iobruno-datalake-raw/dtc_ny_taxi_tripdata/fhv/fhv_tripdata_2019-01.snappy.parquet",
66+
default="gs://iobruno-lakehouse-raw/nyc_tlc_dataset/fhv_trip_data/fhv_tripdata_2019-01.snappy.parquet",
7467
)
7568
zone_lookup_gcs_path = getenv(
7669
key="ZONE_LOOKUP_PATH",
77-
default="gs://iobruno-datalake-raw/dtc_ny_taxi_tripdata/zone_lookup/taxi_zone_lookup.csv.gz",
70+
default="gs://iobruno-lakehouse-raw/nyc_tlc_dataset/zone_lookup/taxi_zone_lookup.csv.gz",
7871
)
7972

80-
print(f"Now fetching 'FHV' Dataset: {fhv_gcs_path}")
81-
73+
logger.info(f"Now fetching 'FHV' Dataset: {fhv_gcs_path}")
8274
fhv: DataFrame = spark.read.parquet(fhv_gcs_path)
83-
print(f"Now fetching 'Zone Lookup' Dataset: {zone_lookup_gcs_path}")
75+
fhv.createTempView("fhv")
8476

77+
logger.info(f"Now fetching 'Zone Lookup' Dataset: {zone_lookup_gcs_path}")
8578
zones: DataFrame = (
8679
spark.read
8780
.option("header", True)
@@ -95,21 +88,22 @@ def main():
9588
)
9689
.csv(path=zone_lookup_gcs_path)
9790
)
98-
99-
print("Creating Temporaty Views from DataFrames...")
100-
fhv.createTempView("fhv")
10191
zones.createTempView("zones")
10292

10393
# Join DataFrames with SparkSQL
104-
print("Joining DataFrames with SparkSQL")
94+
logger.info("Joining DataFrames with SparkSQL")
10595
sdf = join_dfs_with_spark_sql(spark)
10696

107-
print("Preparing to write resulting DataFrame...")
108-
sdf.write.option("compression", "snappy").mode("overwrite").parquet(
109-
"gs://iobruno-datalake-raw/spark-warehouse/"
97+
logger.info("Preparing to write resulting DataFrame...")
98+
(
99+
sdf.write
100+
.option("compression", "snappy")
101+
.mode("overwrite")
102+
.parquet("gs://iobruno-lakehouse-raw/spark-warehouse/")
110103
)
111104

112-
print("All done!")
105+
logger.info("All done!")
106+
spark.stop()
113107

114108

115109
if __name__ == "__main__":

module5-batch-processing/pyspark-3.x/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ readme = "README.md"
66
requires-python = ">=3.12,<3.14"
77

88
dependencies = [
9-
"pyspark[connect]>=3.5.7,<4.0",
9+
"pyspark[connect]==3.5.7,<4.0",
1010
"pyarrow>=23.0.0,<24.0",
1111
]
1212

module5-batch-processing/pyspark-3.x/uv.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

module5-batch-processing/spark-3.5-standalone.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ spark.worker.cleanup.interval=600
1414
spark.shuffle.service.db.enabled=true
1515
spark.shuffle.service.db.backend=ROCKSDB
1616

17+
# Event Log (History Server)
18+
spark.eventLog.enabled=true
19+
spark.eventLog.dir=/opt/spark/logs/
20+
1721
# Classpath
1822
spark.driver.extraClassPath=/opt/spark/extra-jars/*
1923
spark.executor.extraClassPath=/opt/spark/extra-jars/*

0 commit comments

Comments
 (0)