Skip to content

Commit 2a21efb

Browse files
committed
Formatting pass
1 parent 005e3fa commit 2a21efb

13 files changed

Lines changed: 209 additions & 151 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,3 +162,4 @@ cython_debug/
162162
# and can be added to the global gitignore or merged into this file. For a more nuclear
163163
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
164164
#.idea/
165+
examples/remote-tasks/output/app.pex

README.md

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
11
# Salt
22
<img width="994" height="340" alt="image" src="https://github.com/user-attachments/assets/19382711-28f1-44ae-8ff6-93a9915f72c7" />
33

4-
Salt is a Workflow/Task Scheduler system that aims to minimize dependency management for users.
4+
Salt is a Workflow/Task Scheduler framework that does two things:
5+
1. Help reducing dependency management, by providing tools to package your workflow into executables along their dependencies;
6+
2. Provide a Framework that can be used to define distributed DAGs.
57

6-
You can write dags and tasks in your favourite language (between the supported ones), with a set of unique dependencies,
7-
and build them with Salt framework.
8-
9-
After building the tasks, Salt can schedule them in isolated contexts that have all the dependencies needed to run.
8+
You can write dags and tasks in a supported language, define your dependencies, and package all together using Salt.
109

1110
## Why Salt
12-
In most workflow managers (e.g. Airflow) dependency management is complex. Due to the Pythonic structure, it enforces you to use Python and take care
13-
of dependencies, docker images and environments, most of the time leading to complex and clashing environments
14-
that lead to a lot of work to keep organized.
15-
16-
Salt tries to remove all of this. Your Workflow is self-contained with its dependencies and does not need anything but Salt framework
17-
to execute.
11+
In most workflow managers (e.g. Airflow) dependency management can get extremely complex.
1812

19-
Tasks in the Workflows are coded using Salt framework.
13+
Salt simplifies the dependency management while still providing a DAG/Task Scheduling Framework.
14+
Your Workflow is self-contained with its dependencies and does not need anything but Salt framework to execute.
2015

2116
## Getting Started
2217
Salt development requires `pdm` Python Package manager to be installed.

pyproject.toml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ dependencies = [
1313
"loguru<1.0",
1414
"redis<7.0",
1515
"build<2.0",
16-
"grpcio-tools",
17-
"boto3",
18-
"PyYaml",
19-
"apscheduler",
20-
"pydantic_settings",
21-
"pika<2",
22-
"docker"
16+
"grpcio<2.0",
17+
"boto3<2.0",
18+
"PyYaml<7.0",
19+
"apscheduler<4.0",
20+
"pydantic_settings<3.0",
21+
"pika<2.0",
22+
"docker<8.0"
2323
]
2424

2525
requires-python = "<3.11"
@@ -33,7 +33,7 @@ salt = "salt.cli:app"
3333
[project.optional-dependencies]
3434
dev = [
3535
"pytest>=8.4.1",
36-
"grpcio-tools"
36+
"grpcio-tools<2.0"
3737
]
3838

3939
[tool.setuptools.dynamic]

src/salt/backend/scheduler/scheduler.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,30 +52,33 @@ def queue_workflow(self, workflow_data):
5252
TODO The handling of KILL signal (e.g. to stop the workflow) is handled using Kafka.
5353
Once the job is picked by a worker, the worker starts listening for KILL signals through Kafka or similar.
5454
"""
55-
rabbit = pika.BlockingConnection(pika.ConnectionParameters(
56-
host=settings.rabbit_settings.rabbit_url,
57-
credentials=pika.PlainCredentials(settings.rabbit_settings.rabbit_user,
58-
settings.rabbit_settings.rabbit_password)
59-
))
55+
rabbit = pika.BlockingConnection(
56+
pika.ConnectionParameters(
57+
host=settings.rabbit_settings.rabbit_url,
58+
credentials=pika.PlainCredentials(
59+
settings.rabbit_settings.rabbit_user,
60+
settings.rabbit_settings.rabbit_password,
61+
),
62+
)
63+
)
6064
channel = rabbit.channel()
6165
queue = workflow_data.get("queue", "default")
6266
queue = "workflows" if queue == "default" else queue
6367

6468
channel.queue_declare(queue="workflows", durable=True)
6569
channel.basic_publish(
66-
exchange='',
70+
exchange="",
6771
routing_key=queue,
6872
body=json.dumps(workflow_data),
69-
properties=pika.BasicProperties(
70-
delivery_mode=2 # Make message persistent
71-
)
73+
properties=pika.BasicProperties(delivery_mode=2), # Make message persistent
7274
)
7375

7476
channel.close()
7577
rabbit.close()
7678

77-
logger.debug(f"Queued [{workflow_data['workflow_id']}] v{workflow_data['version']} to queue: {queue}")
78-
79+
logger.debug(
80+
f"Queued [{workflow_data['workflow_id']}] v{workflow_data['version']} to queue: {queue}"
81+
)
7982

8083
def schedule_workflow(self, scheduler, workflow_data):
8184
schedule = workflow_data.get("schedule", {})
@@ -87,15 +90,15 @@ def schedule_workflow(self, scheduler, workflow_data):
8790
trigger=IntervalTrigger(seconds=int(schedule["every_seconds"])),
8891
id=str(uuid.uuid4()),
8992
args=[workflow_data],
90-
replace_existing=False
93+
replace_existing=False,
9194
)
9295
elif schedule.get("type") == "cron":
9396
scheduler.add_job(
9497
func=self.queue_workflow,
9598
trigger=CronTrigger.from_crontab(schedule["cron"]),
9699
id=str(uuid.uuid4()),
97100
args=[workflow_data],
98-
replace_existing=False
101+
replace_existing=False,
99102
)
100103
else:
101104
print(f"Unknown schedule type for workflow {workflow_id}")

src/salt/backend/worker/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from pydantic_settings import BaseSettings
44

5+
56
class WorkflowWorkerSettings(BaseSettings):
67
queue: Optional[str] = "workflows"
78

src/salt/backend/worker/workflow_worker.py

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
Once a Workflow Schedule is consumed by the Worker, the Workflow is executed on it and the atomic tasks will
66
execute as defined."""
7+
78
import json
89
import pathlib
910
import subprocess
@@ -22,39 +23,53 @@
2223

2324
class WorkflowWorker:
2425
def __init__(self):
25-
self._rabbit = pika.BlockingConnection(pika.ConnectionParameters(
26-
host=settings.rabbit_settings.rabbit_url,
27-
credentials=pika.PlainCredentials(settings.rabbit_settings.rabbit_user,
28-
settings.rabbit_settings.rabbit_password)
29-
))
26+
self._rabbit = pika.BlockingConnection(
27+
pika.ConnectionParameters(
28+
host=settings.rabbit_settings.rabbit_url,
29+
credentials=pika.PlainCredentials(
30+
settings.rabbit_settings.rabbit_user,
31+
settings.rabbit_settings.rabbit_password,
32+
),
33+
)
34+
)
3035

3136
self._workflow_registry = workflow_registry.WorkflowRegistry()
3237

3338
def execute_workflow(self, ch, method, properties, body):
3439
try:
3540
workflow_obj = workflow_pb2.Workflow()
36-
ParseDict(json.loads(body.decode()), workflow_obj, ignore_unknown_fields=True)
41+
ParseDict(
42+
json.loads(body.decode()), workflow_obj, ignore_unknown_fields=True
43+
)
3744

3845
# Download and execute workflow
3946
with tempfile.TemporaryDirectory() as tmp_dir:
4047
workflow_file_path = pathlib.Path(tmp_dir) / "workflow.pex"
41-
self._workflow_registry.s3_download_workflow_binary(workflow=workflow_obj, workflow_file_path=workflow_file_path)
48+
self._workflow_registry.s3_download_workflow_binary(
49+
workflow=workflow_obj, workflow_file_path=workflow_file_path
50+
)
4251
if not os.path.exists(workflow_file_path):
43-
logger.error(f"Workflow file was not downloaded correctly at: {workflow_file_path}")
52+
logger.error(
53+
f"Workflow file was not downloaded correctly at: {workflow_file_path}"
54+
)
4455

45-
logger.debug(f"Downloaded workflow binary from S3: {workflow_obj.workflow_id}")
56+
logger.debug(
57+
f"Downloaded workflow binary from S3: {workflow_obj.workflow_id}"
58+
)
4659

4760
# Execute Workflow
4861
result = subprocess.run(
4962
["python3.10", str(workflow_file_path)],
5063
stdout=subprocess.PIPE,
51-
stderr=subprocess.PIPE
64+
stderr=subprocess.PIPE,
5265
)
5366

5467
logger.debug(result.stdout)
5568
logger.debug(result)
5669

57-
ch.basic_ack(delivery_tag=method.delivery_tag) # Acknowledge successful processing
70+
ch.basic_ack(
71+
delivery_tag=method.delivery_tag
72+
) # Acknowledge successful processing
5873
except Exception as e:
5974
print(f"Error processing message: {e}")
6075
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
@@ -66,10 +81,13 @@ def loop(self):
6681
channel.queue_declare(queue=queue_name, durable=True)
6782

6883
channel.basic_qos(prefetch_count=1) # Ensure fair dispatch
69-
channel.basic_consume(queue=queue_name, on_message_callback=self.execute_workflow)
84+
channel.basic_consume(
85+
queue=queue_name, on_message_callback=self.execute_workflow
86+
)
7087

7188
print(" [*] Waiting for messages. To exit press CTRL+C")
7289
channel.start_consuming()
7390

74-
if __name__ == '__main__':
91+
92+
if __name__ == "__main__":
7593
WorkflowWorker().loop()

src/salt/backend/workflow_service/utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ def request_generator(workflow: Workflow, file_path: Path):
4747
salt_data = yaml.safe_load(f)
4848
wf = ParseDict(salt_data, Workflow())
4949

50-
with grpc.insecure_channel(f"{workflow_service_settings.workflow_service_url}:{workflow_service_settings.workflow_service_port}") as channel:
50+
with grpc.insecure_channel(
51+
f"{workflow_service_settings.workflow_service_url}:{workflow_service_settings.workflow_service_port}"
52+
) as channel:
5153
response = WorkflowServiceStub(channel=channel).RegisterWorkflow(
5254
request_generator(wf, pex.pop())
5355
)
@@ -58,6 +60,7 @@ def get_workflow_binary():
5860
"""Given workflow metadata, downloads the binary from S3."""
5961
pass
6062

63+
6164
if __name__ == "__main__":
6265
import pathlib
6366

src/salt/backend/workflow_service/workflow_pb2.py

Lines changed: 22 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/salt/backend/workflow_service/workflow_pb2.pyi

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,27 @@ class Trigger(_message.Message):
1313
def __init__(self, type: _Optional[str] = ...) -> None: ...
1414

1515
class Workflow(_message.Message):
16-
__slots__ = ("workflow_id", "chunk", "version", "paused", "owner", "schedule", "triggers", "queue")
16+
__slots__ = (
17+
"workflow_id",
18+
"chunk",
19+
"version",
20+
"paused",
21+
"owner",
22+
"schedule",
23+
"triggers",
24+
"queue",
25+
)
26+
1727
class ScheduleEntry(_message.Message):
1828
__slots__ = ("key", "value")
1929
KEY_FIELD_NUMBER: _ClassVar[int]
2030
VALUE_FIELD_NUMBER: _ClassVar[int]
2131
key: str
2232
value: str
23-
def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ...
33+
def __init__(
34+
self, key: _Optional[str] = ..., value: _Optional[str] = ...
35+
) -> None: ...
36+
2437
WORKFLOW_ID_FIELD_NUMBER: _ClassVar[int]
2538
CHUNK_FIELD_NUMBER: _ClassVar[int]
2639
VERSION_FIELD_NUMBER: _ClassVar[int]
@@ -37,7 +50,17 @@ class Workflow(_message.Message):
3750
schedule: _containers.ScalarMap[str, str]
3851
triggers: _containers.RepeatedCompositeFieldContainer[Trigger]
3952
queue: str
40-
def __init__(self, workflow_id: _Optional[str] = ..., chunk: _Optional[bytes] = ..., version: _Optional[str] = ..., paused: bool = ..., owner: _Optional[str] = ..., schedule: _Optional[_Mapping[str, str]] = ..., triggers: _Optional[_Iterable[_Union[Trigger, _Mapping]]] = ..., queue: _Optional[str] = ...) -> None: ...
53+
def __init__(
54+
self,
55+
workflow_id: _Optional[str] = ...,
56+
chunk: _Optional[bytes] = ...,
57+
version: _Optional[str] = ...,
58+
paused: bool = ...,
59+
owner: _Optional[str] = ...,
60+
schedule: _Optional[_Mapping[str, str]] = ...,
61+
triggers: _Optional[_Iterable[_Union[Trigger, _Mapping]]] = ...,
62+
queue: _Optional[str] = ...,
63+
) -> None: ...
4164

4265
class Result(_message.Message):
4366
__slots__ = ("success", "message")

0 commit comments

Comments
 (0)