Skip to content

Commit a7d52da

Browse files
eakmanrqnewtonapple
authored andcommitted
feat: Databricks grants (#5436)
1 parent ec7a704 commit a7d52da

4 files changed

Lines changed: 334 additions & 31 deletions

File tree

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from functools import partial
66

77
from sqlglot import exp
8+
89
from sqlmesh.core.dialect import to_schema
910
from sqlmesh.core.engine_adapter.shared import (
1011
CatalogSupport,
@@ -23,7 +24,7 @@
2324
import pandas as pd
2425

2526
from sqlmesh.core._typing import SchemaName, TableName, SessionProperties
26-
from sqlmesh.core.engine_adapter._typing import DF, PySparkSession, Query
27+
from sqlmesh.core.engine_adapter._typing import DF, PySparkSession, Query, GrantsConfig, DCL
2728

2829
logger = logging.getLogger(__name__)
2930

@@ -34,6 +35,7 @@ class DatabricksEngineAdapter(SparkEngineAdapter):
3435
SUPPORTS_CLONING = True
3536
SUPPORTS_MATERIALIZED_VIEWS = True
3637
SUPPORTS_MATERIALIZED_VIEW_SCHEMA = True
38+
SUPPORTS_GRANTS = True
3739
# Spark has this set to false for compatibility when mixing with Trino but that isn't a concern with Databricks
3840
QUOTE_IDENTIFIERS_IN_VIEWS = True
3941
SCHEMA_DIFFER_KWARGS = {
@@ -151,6 +153,109 @@ def spark(self) -> PySparkSession:
151153
def catalog_support(self) -> CatalogSupport:
152154
return CatalogSupport.FULL_SUPPORT
153155

156+
@staticmethod
157+
def _grant_object_kind(table_type: DataObjectType) -> str:
158+
if table_type == DataObjectType.VIEW:
159+
return "VIEW"
160+
if table_type == DataObjectType.MATERIALIZED_VIEW:
161+
return "MATERIALIZED VIEW"
162+
return "TABLE"
163+
164+
def _dcl_grants_config_expr(
165+
self,
166+
dcl_cmd: t.Type[DCL],
167+
table: exp.Table,
168+
grant_config: GrantsConfig,
169+
table_type: DataObjectType = DataObjectType.TABLE,
170+
) -> t.List[exp.Expression]:
171+
expressions: t.List[exp.Expression] = []
172+
if not grant_config:
173+
return expressions
174+
175+
object_kind = self._grant_object_kind(table_type)
176+
for privilege, principals in grant_config.items():
177+
for principal in principals:
178+
args: t.Dict[str, t.Any] = {
179+
"privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))],
180+
"securable": table.copy(),
181+
"principals": [exp.to_identifier(principal.lower())],
182+
}
183+
184+
if object_kind:
185+
args["kind"] = exp.Var(this=object_kind)
186+
187+
expressions.append(dcl_cmd(**args)) # type: ignore[arg-type]
188+
189+
return expressions
190+
191+
def _apply_grants_config_expr(
192+
self,
193+
table: exp.Table,
194+
grant_config: GrantsConfig,
195+
table_type: DataObjectType = DataObjectType.TABLE,
196+
) -> t.List[exp.Expression]:
197+
return self._dcl_grants_config_expr(exp.Grant, table, grant_config, table_type)
198+
199+
def _revoke_grants_config_expr(
200+
self,
201+
table: exp.Table,
202+
grant_config: GrantsConfig,
203+
table_type: DataObjectType = DataObjectType.TABLE,
204+
) -> t.List[exp.Expression]:
205+
return self._dcl_grants_config_expr(exp.Revoke, table, grant_config, table_type)
206+
207+
def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig:
208+
if schema_identifier := table.args.get("db"):
209+
schema_name = schema_identifier.this
210+
else:
211+
schema_name = self.get_current_database()
212+
if catalog_identifier := table.args.get("catalog"):
213+
catalog_name = catalog_identifier.this
214+
else:
215+
catalog_name = self.get_current_catalog()
216+
table_name = table.args.get("this").this # type: ignore
217+
218+
grant_expr = (
219+
exp.select("privilege_type", "grantee")
220+
.from_(
221+
exp.table_(
222+
"table_privileges",
223+
db="information_schema",
224+
catalog=catalog_name,
225+
)
226+
)
227+
.where(
228+
exp.and_(
229+
exp.column("table_catalog").eq(exp.Literal.string(catalog_name.lower())),
230+
exp.column("table_schema").eq(exp.Literal.string(schema_name.lower())),
231+
exp.column("table_name").eq(exp.Literal.string(table_name.lower())),
232+
exp.column("grantor").eq(exp.func("current_user")),
233+
exp.column("grantee").neq(exp.func("current_user")),
234+
# We only care about explicitly granted privileges and not inherited ones
235+
# if this is removed you would see grants inherited from the catalog get returned
236+
exp.column("inherited_from").eq(exp.Literal.string("NONE")),
237+
)
238+
)
239+
)
240+
241+
results = self.fetchall(grant_expr)
242+
243+
grants_dict: GrantsConfig = {}
244+
for privilege_raw, grantee_raw in results:
245+
if privilege_raw is None or grantee_raw is None:
246+
continue
247+
248+
privilege = str(privilege_raw)
249+
grantee = str(grantee_raw)
250+
if not privilege or not grantee:
251+
continue
252+
253+
grantees = grants_dict.setdefault(privilege, [])
254+
if grantee not in grantees:
255+
grantees.append(grantee)
256+
257+
return grants_dict
258+
154259
def _begin_session(self, properties: SessionProperties) -> t.Any:
155260
"""Begin a new session."""
156261
# Align the different possible connectors to a single catalog

tests/core/engine_adapter/integration/__init__.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -746,17 +746,25 @@ def upsert_sql_model(self, model_definition: str) -> t.Tuple[Context, SqlModel]:
746746
self._context.upsert_model(model)
747747
return self._context, model
748748

749-
def _get_create_user_or_role(self, username: str, password: t.Optional[str] = None) -> str:
749+
def _get_create_user_or_role(
750+
self, username: str, password: t.Optional[str] = None
751+
) -> t.Tuple[str, t.Optional[str]]:
750752
password = password or random_id()
751753
if self.dialect == "postgres":
752-
return f"CREATE USER \"{username}\" WITH PASSWORD '{password}'"
754+
return username, f"CREATE USER \"{username}\" WITH PASSWORD '{password}'"
753755
if self.dialect == "snowflake":
754-
return f"CREATE ROLE {username}"
756+
return username, f"CREATE ROLE {username}"
757+
if self.dialect == "databricks":
758+
# Creating an account-level group in Databricks requires making REST API calls so we are going to
759+
# use a pre-created group instead. We assume the suffix on the name is the unique id
760+
return "_".join(username.split("_")[:-1]), None
755761
raise ValueError(f"User creation not supported for dialect: {self.dialect}")
756762

757-
def _create_user_or_role(self, username: str, password: t.Optional[str] = None) -> None:
758-
create_user_sql = self._get_create_user_or_role(username, password)
759-
self.engine_adapter.execute(create_user_sql)
763+
def _create_user_or_role(self, username: str, password: t.Optional[str] = None) -> str:
764+
username, create_user_sql = self._get_create_user_or_role(username, password)
765+
if create_user_sql:
766+
self.engine_adapter.execute(create_user_sql)
767+
return username
760768

761769
@contextmanager
762770
def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str]]:
@@ -769,7 +777,7 @@ def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str]
769777
self.add_test_suffix(f"test_{role_name}"), dialect=self.dialect
770778
).sql(dialect=self.dialect)
771779
password = random_id()
772-
self._create_user_or_role(user_name, password)
780+
user_name = self._create_user_or_role(user_name, password)
773781
created_users.append(user_name)
774782
roles[role_name] = user_name
775783

@@ -779,6 +787,18 @@ def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str]
779787
for user_name in created_users:
780788
self._cleanup_user_or_role(user_name)
781789

790+
def get_insert_privilege(self) -> str:
791+
if self.dialect == "databricks":
792+
# This would really be "MODIFY" but for the purposes of having this be unique from UPDATE
793+
# we return "MANAGE" instead
794+
return "MANAGE"
795+
return "INSERT"
796+
797+
def get_update_privilege(self) -> str:
798+
if self.dialect == "databricks":
799+
return "MODIFY"
800+
return "UPDATE"
801+
782802
def _cleanup_user_or_role(self, user_name: str) -> None:
783803
"""Helper function to clean up a PostgreSQL user and all their dependencies."""
784804
try:
@@ -792,6 +812,8 @@ def _cleanup_user_or_role(self, user_name: str) -> None:
792812
self.engine_adapter.execute(f'DROP USER IF EXISTS "{user_name}"')
793813
elif self.dialect == "snowflake":
794814
self.engine_adapter.execute(f"DROP ROLE IF EXISTS {user_name}")
815+
elif self.dialect == "databricks":
816+
pass
795817
except Exception:
796818
pass
797819

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4036,23 +4036,24 @@ def test_sync_grants_config(ctx: TestContext) -> None:
40364036
)
40374037

40384038
table = ctx.table("sync_grants_integration")
4039-
4039+
insert_privilege = ctx.get_insert_privilege()
4040+
update_privilege = ctx.get_update_privilege()
40404041
with ctx.create_users_or_roles("reader", "writer", "admin") as roles:
40414042
ctx.engine_adapter.create_table(table, {"id": exp.DataType.build("INT")})
40424043

40434044
initial_grants = {
40444045
"SELECT": [roles["reader"]],
4045-
"INSERT": [roles["writer"]],
4046+
insert_privilege: [roles["writer"]],
40464047
}
40474048
ctx.engine_adapter.sync_grants_config(table, initial_grants)
40484049

40494050
current_grants = ctx.engine_adapter._get_current_grants_config(table)
40504051
assert set(current_grants.get("SELECT", [])) == {roles["reader"]}
4051-
assert set(current_grants.get("INSERT", [])) == {roles["writer"]}
4052+
assert set(current_grants.get(insert_privilege, [])) == {roles["writer"]}
40524053

40534054
target_grants = {
40544055
"SELECT": [roles["writer"], roles["admin"]],
4055-
"UPDATE": [roles["admin"]],
4056+
update_privilege: [roles["admin"]],
40564057
}
40574058
ctx.engine_adapter.sync_grants_config(table, target_grants)
40584059

@@ -4061,8 +4062,8 @@ def test_sync_grants_config(ctx: TestContext) -> None:
40614062
roles["writer"],
40624063
roles["admin"],
40634064
}
4064-
assert set(synced_grants.get("UPDATE", [])) == {roles["admin"]}
4065-
assert synced_grants.get("INSERT", []) == []
4065+
assert set(synced_grants.get(update_privilege, [])) == {roles["admin"]}
4066+
assert synced_grants.get(insert_privilege, []) == []
40664067

40674068

40684069
def test_grants_sync_empty_config(ctx: TestContext):
@@ -4072,19 +4073,19 @@ def test_grants_sync_empty_config(ctx: TestContext):
40724073
)
40734074

40744075
table = ctx.table("grants_empty_test")
4075-
4076+
insert_privilege = ctx.get_insert_privilege()
40764077
with ctx.create_users_or_roles("user") as roles:
40774078
ctx.engine_adapter.create_table(table, {"id": exp.DataType.build("INT")})
40784079

40794080
initial_grants = {
40804081
"SELECT": [roles["user"]],
4081-
"INSERT": [roles["user"]],
4082+
insert_privilege: [roles["user"]],
40824083
}
40834084
ctx.engine_adapter.sync_grants_config(table, initial_grants)
40844085

40854086
initial_current_grants = ctx.engine_adapter._get_current_grants_config(table)
40864087
assert roles["user"] in initial_current_grants.get("SELECT", [])
4087-
assert roles["user"] in initial_current_grants.get("INSERT", [])
4088+
assert roles["user"] in initial_current_grants.get(insert_privilege, [])
40884089

40894090
ctx.engine_adapter.sync_grants_config(table, {})
40904091

@@ -4098,18 +4099,12 @@ def test_grants_case_insensitive_grantees(ctx: TestContext):
40984099
f"Skipping Test since engine adapter {ctx.engine_adapter.dialect} doesn't support grants"
40994100
)
41004101

4101-
with ctx.create_users_or_roles("test_reader", "test_writer") as roles:
4102+
with ctx.create_users_or_roles("reader", "writer") as roles:
41024103
table = ctx.table("grants_quoted_test")
41034104
ctx.engine_adapter.create_table(table, {"id": exp.DataType.build("INT")})
41044105

4105-
test_schema = table.db
4106-
for role_credentials in roles.values():
4107-
ctx.engine_adapter.execute(
4108-
f'GRANT USAGE ON SCHEMA "{test_schema}" TO "{role_credentials}"'
4109-
)
4110-
4111-
reader = roles["test_reader"]
4112-
writer = roles["test_writer"]
4106+
reader = roles["reader"]
4107+
writer = roles["writer"]
41134108

41144109
grants_config = {"SELECT": [reader, writer.upper()]}
41154110
ctx.engine_adapter.sync_grants_config(table, grants_config)
@@ -4134,7 +4129,8 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path):
41344129
f"Skipping Test since engine adapter {ctx.engine_adapter.dialect} doesn't support grants"
41354130
)
41364131

4137-
table = ctx.table("grant_model").sql(dialect=ctx.dialect)
4132+
table = ctx.table("grant_model").sql(dialect="duckdb")
4133+
insert_privilege = ctx.get_insert_privilege()
41384134
with ctx.create_users_or_roles("analyst", "etl_user") as roles:
41394135
(tmp_path / "models").mkdir(exist_ok=True)
41404136

@@ -4183,7 +4179,7 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path):
41834179
kind FULL,
41844180
grants (
41854181
'select' = ['{roles["analyst"]}', '{roles["etl_user"]}'],
4186-
'insert' = ['{roles["etl_user"]}']
4182+
'{insert_privilege}' = ['{roles["etl_user"]}']
41874183
),
41884184
grants_target_layer 'all'
41894185
);
@@ -4208,14 +4204,17 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path):
42084204
)
42094205
expected_final_grants = {
42104206
"SELECT": [roles["analyst"], roles["etl_user"]],
4211-
"INSERT": [roles["etl_user"]],
4207+
insert_privilege: [roles["etl_user"]],
42124208
}
42134209
assert set(final_grants.get("SELECT", [])) == set(expected_final_grants["SELECT"])
4214-
assert final_grants.get("INSERT", []) == expected_final_grants["INSERT"]
4210+
assert final_grants.get(insert_privilege, []) == expected_final_grants[insert_privilege]
42154211

42164212
# Virtual layer should also have the updated grants
42174213
updated_virtual_grants = ctx.engine_adapter._get_current_grants_config(
42184214
exp.to_table(view_name, dialect=ctx.dialect)
42194215
)
42204216
assert set(updated_virtual_grants.get("SELECT", [])) == set(expected_final_grants["SELECT"])
4221-
assert updated_virtual_grants.get("INSERT", []) == expected_final_grants["INSERT"]
4217+
assert (
4218+
updated_virtual_grants.get(insert_privilege, [])
4219+
== expected_final_grants[insert_privilege]
4220+
)

0 commit comments

Comments
 (0)