Skip to content

Commit e3d019a

Browse files
committed
Fixes #21027: [Snowflake] Add snowflake stages lineage support (#25944)
1 parent be55720 commit e3d019a

5 files changed

Lines changed: 535 additions & 13 deletions

File tree

ingestion/src/metadata/ingestion/lineage/parser.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
from collections import defaultdict
1818
from copy import deepcopy
1919
from logging.config import DictConfigurator
20-
from typing import Any, Dict, List, Optional, Tuple, Union
20+
from typing import Dict, List, Optional, Tuple, Union
2121

2222
import sqlparse
2323
from cached_property import cached_property
2424
from collate_sqllineage import SQLPARSE_DIALECT
25-
from collate_sqllineage.core.models import Column, DataFunction, Table
25+
from collate_sqllineage.core.models import Column, DataFunction, Location, Table
2626
from collate_sqllineage.core.parser.sqlfluff.analyzer import SqlFluffLineageAnalyzer
2727
from collate_sqllineage.core.parser.sqlglot.analyzer import SqlGlotLineageAnalyzer
2828
from collate_sqllineage.exceptions import SQLLineageException
@@ -164,7 +164,7 @@ def intermediate_tables(self) -> List[Table]:
164164
return []
165165

166166
@cached_property
167-
def source_tables(self) -> List[Union[Table, DataFunction]]:
167+
def source_tables(self) -> List[Union[Table, DataFunction, Location]]:
168168
"""
169169
Get a list of source tables
170170
"""
@@ -174,7 +174,7 @@ def source_tables(self) -> List[Union[Table, DataFunction]]:
174174
return []
175175

176176
@cached_property
177-
def target_tables(self) -> List[Table]:
177+
def target_tables(self) -> List[Union[Table, Location]]:
178178
"""
179179
Get a list of target tables
180180
"""
@@ -485,13 +485,15 @@ def table_joins(self) -> Dict[str, List[TableColumnJoin]]:
485485

486486
return join_data
487487

488-
def retrieve_tables(self, tables: List[Any]) -> List[Table]:
488+
def retrieve_tables(
489+
self, tables: List[Union[Table, DataFunction, Location]]
490+
) -> List[Union[Table, DataFunction, Location]]:
489491
if not self._clean_query:
490492
return []
491493
return [
492494
self.clean_table_name(table)
493495
for table in tables
494-
if isinstance(table, (Table, DataFunction))
496+
if isinstance(table, (Table, DataFunction, Location))
495497
]
496498

497499
@classmethod
@@ -520,9 +522,11 @@ def clean_raw_query(cls, raw_query: str) -> Optional[str]:
520522

521523
# We remove queries of the type 'COPY table FROM path' since they are data manipulation statements
522524
# that do not provide value for user. However, we keep Snowflake 'COPY INTO table FROM @stage'
523-
# statements as they provide lineage from stages to tables.
524-
if insensitive_match(clean_query, r"^COPY\s+") and not insensitive_match(
525-
clean_query, r"^COPY\s+INTO\s+.*\s+FROM\s+@"
525+
# and 'COPY INTO @stage FROM table' as they are used for loading/unloading data and are relevant
526+
# for lineage from stages to tables or vice versa.
527+
if insensitive_match(clean_query, r"^COPY\s+") and not (
528+
insensitive_match(clean_query, r"^COPY\s+INTO\s+.*\s+FROM\s+@")
529+
or insensitive_match(clean_query, r"^COPY\s+INTO\s+@.*\s+FROM\s+.*")
526530
):
527531
return None
528532

@@ -739,7 +743,9 @@ def get_sqlparse_lineage_runner(query: str) -> LineageRunner:
739743
return None
740744

741745
@staticmethod
742-
def clean_table_name(table: Table) -> Table:
746+
def clean_table_name(
747+
table: Union[Table, DataFunction, Location],
748+
) -> Union[Table, DataFunction, Location]:
743749
"""
744750
Clean table name by:
745751
- Removing brackets from the beginning and end of the table and schema name
@@ -761,4 +767,15 @@ def clean_table_name(table: Table) -> Table:
761767
clean_table.schema.raw_name = insensitive_replace(
762768
clean_table.schema.raw_name, r"\[(.*)\]", r"\1"
763769
)
770+
# Remove leading @ from the location storage objects if present as they are
771+
# not used while ingesting location storage objects in OpenMetadata
772+
# ex. @STAGE_01 -> STAGE_01 (snowflake stage object)
773+
if (
774+
isinstance(clean_table, Location)
775+
and clean_table.raw_name
776+
and insensitive_match(clean_table.raw_name, r"@.*")
777+
):
778+
clean_table.raw_name = insensitive_replace(
779+
clean_table.raw_name, r"@(.*)", r"\1"
780+
)
764781
return clean_table

ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class SnowflakeLineageSource(
4545

4646
filters = """
4747
AND (
48-
QUERY_TYPE IN ('MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT','COPY')
48+
QUERY_TYPE IN ('MERGE', 'UPDATE','CREATE_TABLE_AS_SELECT','COPY','UNLOAD')
4949
OR (QUERY_TYPE = 'INSERT' and query_text ILIKE '%%insert%%into%%select%%')
5050
OR (QUERY_TYPE = 'ALTER' and query_text ILIKE '%%alter%%table%%swap%%')
5151
OR (QUERY_TYPE = 'CREATE_TABLE' and query_text ILIKE '%%clone%%')

ingestion/tests/unit/lineage/queries/test_specific_dialect_queries.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
from unittest import TestCase
106106

107107
import pytest
108-
from collate_sqllineage.core.models import Path
108+
from collate_sqllineage.core.models import Location, Path
109109

110110
from ingestion.tests.unit.lineage.queries.helpers import (
111111
TestColumnQualifierTuple,
@@ -1090,3 +1090,83 @@ def test_oracle_create_procedure_insert_select(self):
10901090
test_sqlfluff=False,
10911091
test_sqlparse=False,
10921092
)
1093+
1094+
# -------------------------------------------------------------------------
1095+
# Snowflake Stage Lineage Tests (COPY INTO @stage / COPY INTO table FROM @stage)
1096+
# -------------------------------------------------------------------------
1097+
1098+
def test_snowflake_copy_into_table_from_stage(self):
1099+
"""Test Snowflake COPY INTO table FROM @stage (loading data from stage)"""
1100+
query = "COPY INTO wine_quality FROM @demo FILE_FORMAT = wine_csv_format;"
1101+
1102+
assert_table_lineage_equal(
1103+
query,
1104+
{Location("@demo")},
1105+
{"wine_quality"},
1106+
dialect=Dialect.SNOWFLAKE.value,
1107+
)
1108+
1109+
# No column lineage expected for COPY INTO stage operations
1110+
assert_column_lineage_equal(
1111+
query,
1112+
[],
1113+
dialect=Dialect.SNOWFLAKE.value,
1114+
)
1115+
1116+
def test_snowflake_copy_into_stage_from_table(self):
1117+
"""Test Snowflake COPY INTO @stage FROM table (unloading data to stage)"""
1118+
query = "COPY INTO @my_stage FROM my_table"
1119+
1120+
assert_table_lineage_equal(
1121+
query,
1122+
{"my_table"},
1123+
{Location("@my_stage")},
1124+
dialect=Dialect.SNOWFLAKE.value,
1125+
)
1126+
1127+
assert_column_lineage_equal(
1128+
query,
1129+
[],
1130+
dialect=Dialect.SNOWFLAKE.value,
1131+
)
1132+
1133+
def test_snowflake_copy_into_stage_from_select(self):
1134+
"""Test Snowflake COPY INTO @stage FROM (SELECT ...) - unload with subquery"""
1135+
query = "COPY INTO @db.schema.my_stage FROM (SELECT col1, col2 FROM my_table)"
1136+
1137+
assert_table_lineage_equal(
1138+
query,
1139+
{"my_table"},
1140+
{Location("@db.schema.my_stage")},
1141+
dialect=Dialect.SNOWFLAKE.value,
1142+
# SqlParse builds a different internal graph structure for subqueries
1143+
# (5 nodes vs 3 nodes) even though final lineage results are identical
1144+
skip_graph_check=True,
1145+
)
1146+
1147+
assert_column_lineage_equal(
1148+
query,
1149+
[],
1150+
dialect=Dialect.SNOWFLAKE.value,
1151+
# SqlGlot and SqlFluff/SqlParse produce different graph structures for subqueries
1152+
skip_graph_check=True,
1153+
)
1154+
1155+
def test_snowflake_copy_into_fully_qualified_stage(self):
1156+
"""Test COPY INTO table FROM @db.schema.stage with fully qualified stage name"""
1157+
query = (
1158+
"COPY INTO my_table FROM @my_db.my_schema.my_stage FILE_FORMAT=(TYPE=CSV)"
1159+
)
1160+
1161+
assert_table_lineage_equal(
1162+
query,
1163+
{Location("@my_db.my_schema.my_stage")},
1164+
{"my_table"},
1165+
dialect=Dialect.SNOWFLAKE.value,
1166+
)
1167+
1168+
assert_column_lineage_equal(
1169+
query,
1170+
[],
1171+
dialect=Dialect.SNOWFLAKE.value,
1172+
)

ingestion/tests/unit/lineage/test_sql_lineage.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
from unittest import TestCase
1717

1818
import pytest
19+
from collate_sqllineage.core.models import Location
20+
from collate_sqllineage.core.models import Table as LineageTable
1921

2022
from metadata.generated.schema.entity.data.table import Table
2123
from metadata.ingestion.lineage.models import Dialect
@@ -335,3 +337,130 @@ def test_copy_to_statements_filtered(self):
335337
result,
336338
f"Query should be filtered: {query}",
337339
)
340+
341+
# -------------------------------------------------------------------------
342+
# Snowflake Stage Lineage Tests
343+
# -------------------------------------------------------------------------
344+
345+
def test_copy_into_stage_from_table_not_filtered(self):
346+
"""
347+
Test that Snowflake COPY INTO @stage FROM table (unload) statements
348+
are NOT filtered out, as they provide lineage from tables to stages.
349+
"""
350+
snowflake_unload_queries = [
351+
"COPY INTO @my_stage FROM my_table",
352+
"COPY INTO @db.schema.stage FROM (SELECT * FROM t)",
353+
"copy into @stage/path FROM table1",
354+
"COPY INTO @~/ FROM my_table FILE_FORMAT = (TYPE = CSV COMPRESSION = GZIP)",
355+
"COPY INTO @~/staged FROM sales_data",
356+
"COPY INTO @my_stage/daily/2024/ FROM reporting.public.daily_metrics",
357+
"COPY INTO @external_stage/path/ FROM (SELECT col1 FROM src_table WHERE id > 100)",
358+
]
359+
360+
for query in snowflake_unload_queries:
361+
result = LineageParser.clean_raw_query(query)
362+
self.assertIsNotNone(
363+
result,
364+
f"COPY INTO @stage FROM table should NOT be filtered: {query}",
365+
)
366+
367+
def test_stage_lineage_source_as_location_type(self):
368+
"""
369+
Verify that COPY INTO table FROM @stage returns Location as source
370+
and Table as target with correct types.
371+
"""
372+
query = "COPY INTO wine_quality FROM @demo FILE_FORMAT = wine_csv_format;"
373+
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
374+
375+
self.assertEqual(len(parser.source_tables), 1)
376+
self.assertEqual(len(parser.target_tables), 1)
377+
self.assertIsInstance(parser.source_tables[0], Location)
378+
self.assertNotIsInstance(parser.source_tables[0], LineageTable)
379+
380+
def test_stage_lineage_target_as_location_type(self):
381+
"""
382+
Verify that COPY INTO @stage FROM table returns Table as source
383+
and Location as target with correct types.
384+
"""
385+
query = "COPY INTO @my_stage FROM my_table"
386+
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
387+
388+
self.assertEqual(len(parser.source_tables), 1)
389+
self.assertEqual(len(parser.target_tables), 1)
390+
self.assertNotIsInstance(parser.target_tables[0], LineageTable)
391+
self.assertIsInstance(parser.target_tables[0], Location)
392+
393+
def test_stage_lineage_fully_qualified_names(self):
394+
"""
395+
Test stage lineage with fully qualified database.schema.stage names
396+
for both source and target directions.
397+
"""
398+
# Stage as source (loading data into table)
399+
query_load = "COPY INTO db.schema.target_table FROM @db.schema.my_stage"
400+
parser_load = LineageParser(query_load, dialect=Dialect.SNOWFLAKE)
401+
402+
self.assertEqual(len(parser_load.source_tables), 1)
403+
self.assertEqual(len(parser_load.target_tables), 1)
404+
self.assertIsInstance(parser_load.source_tables[0], Location)
405+
self.assertEqual(str(parser_load.source_tables[0]), "db.schema.my_stage")
406+
self.assertEqual(str(parser_load.target_tables[0]), "db.schema.target_table")
407+
408+
# Stage as target (unloading data from table)
409+
query_unload = "COPY INTO @db.schema.my_stage FROM db.schema.source_table"
410+
parser_unload = LineageParser(query_unload, dialect=Dialect.SNOWFLAKE)
411+
412+
self.assertEqual(len(parser_unload.source_tables), 1)
413+
self.assertEqual(len(parser_unload.target_tables), 1)
414+
self.assertIsInstance(parser_unload.target_tables[0], Location)
415+
self.assertEqual(str(parser_unload.source_tables[0]), "db.schema.source_table")
416+
self.assertEqual(str(parser_unload.target_tables[0]), "db.schema.my_stage")
417+
418+
def test_stage_lineage_unload_with_select_subquery(self):
419+
"""
420+
Test COPY INTO @stage FROM (SELECT ...) extracts the underlying
421+
source table correctly from the subquery.
422+
"""
423+
query = (
424+
"COPY INTO @external_stage/path/ FROM "
425+
"(SELECT col1, col2 FROM db.schema.source_table WHERE id > 100)"
426+
)
427+
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
428+
429+
self.assertEqual(len(parser.source_tables), 1)
430+
self.assertEqual(len(parser.target_tables), 1)
431+
self.assertEqual(str(parser.source_tables[0]), "db.schema.source_table")
432+
self.assertIsInstance(parser.target_tables[0], Location)
433+
434+
def test_stage_lineage_user_stage(self):
435+
"""
436+
Test COPY INTO with user stage (@~/) is properly handled.
437+
"""
438+
query = "COPY INTO @~/ FROM my_table FILE_FORMAT = (TYPE = CSV)"
439+
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
440+
441+
self.assertEqual(len(parser.source_tables), 1)
442+
self.assertEqual(len(parser.target_tables), 1)
443+
self.assertIsInstance(parser.source_tables[0], LineageTable)
444+
self.assertIsInstance(parser.target_tables[0], Location)
445+
446+
def test_stage_lineage_with_file_format_options(self):
447+
"""
448+
Test that file format options don't interfere with lineage parsing.
449+
"""
450+
queries = [
451+
"COPY INTO my_table FROM @stage FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1)",
452+
"COPY INTO my_table FROM @stage FILE_FORMAT = wine_csv_format",
453+
"COPY INTO @stage FROM my_table FILE_FORMAT = (TYPE = PARQUET)",
454+
"COPY INTO my_table FROM @stage PATTERN='.*[.]csv'",
455+
]
456+
457+
for query in queries:
458+
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
459+
self.assertTrue(
460+
len(parser.source_tables) > 0,
461+
f"Expected source tables for query: {query}",
462+
)
463+
self.assertTrue(
464+
len(parser.target_tables) > 0,
465+
f"Expected target tables for query: {query}",
466+
)

0 commit comments

Comments
 (0)