Skip to content

Commit b057212

Browse files
committed
[Lineage/Usage] Preserve integer ordinals in GROUP/ORDER BY clauses in query masking (#26303)
* [Lineage/Usage] Preserve integer ordinals in GROUP/ORDER BY clauses in query masking * Add and reorganize the masker tests * Preserve ordinals in GROUPING SETS masking * Handle Keyword.Order explicitly in sqlparse masker * Defensively allowlist SETS/CUBE/ROLLUP in keyword guard and add ROLLUP/CUBE regression tests
1 parent 5262b88 commit b057212

7 files changed

Lines changed: 1495 additions & 195 deletions

File tree

ingestion/src/metadata/ingestion/lineage/masker.py

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from collate_sqllineage.core.parser.sqlparse.analyzer import SqlParseLineageAnalyzer
2525
from collate_sqllineage.runner import LineageRunner
2626
from sqlparse.sql import Comparison
27-
from sqlparse.tokens import Literal, Number, String
27+
from sqlparse.tokens import Keyword, Literal, Number, String
2828

2929
from metadata.ingestion.lineage.models import Dialect
3030
from metadata.utils.execution_time_tracker import (
@@ -51,7 +51,16 @@ def mask_literals_with_sqlparse(
5151
try:
5252
parsed = parser._parsed_result
5353

54-
def mask_token(token):
54+
def _is_integer_literal(token) -> bool:
55+
"""Check if a token is an integer literal (positional reference candidate)."""
56+
return token.ttype is Number.Integer
57+
58+
def mask_token(token, in_groupby_orderby=False):
59+
"""Mask literal tokens, preserving integer ordinal references in
60+
GROUP BY and ORDER BY clauses."""
61+
# Skip masking integer positional references in GROUP BY / ORDER BY
62+
if in_groupby_orderby and _is_integer_literal(token):
63+
return
5564
# Mask all literals: strings, numbers, or other literal values
5665
if token.ttype in (
5766
String,
@@ -62,18 +71,40 @@ def mask_token(token):
6271
):
6372
token.value = MASK_TOKEN
6473
elif token.is_group:
65-
# Recursively process grouped tokens
66-
for t in token.tokens:
67-
mask_token(t)
68-
69-
# Process all tokens
70-
for token in parsed.tokens:
71-
if isinstance(token, Comparison):
72-
# In comparisons, mask both sides if literals
73-
for t in token.tokens:
74-
mask_token(t)
75-
else:
76-
mask_token(token)
74+
# Recursively process grouped tokens with clause context
75+
_mask_group_tokens(token.tokens, in_groupby_orderby)
76+
77+
def _mask_group_tokens(tokens, in_groupby_orderby=False):
78+
"""Walk sibling tokens, tracking GROUP BY / ORDER BY context."""
79+
current_in_groupby_orderby = in_groupby_orderby
80+
for token in tokens:
81+
# Track when we enter/leave GROUP BY or ORDER BY clauses
82+
if token.ttype is Keyword and token.value.upper() in (
83+
"GROUP BY",
84+
"ORDER BY",
85+
):
86+
current_in_groupby_orderby = True
87+
elif token.ttype is Keyword.Order:
88+
# ASC, DESC, NULLS FIRST, NULLS LAST — keep context
89+
pass
90+
elif token.ttype is Keyword and token.value.upper() not in (
91+
"GROUPING",
92+
"SETS",
93+
"CUBE",
94+
"ROLLUP",
95+
):
96+
# Any other keyword resets the context (HAVING, LIMIT, WHERE, etc.)
97+
current_in_groupby_orderby = False
98+
99+
if isinstance(token, Comparison):
100+
# In comparisons, always mask literals (e.g. HAVING COUNT(*) > 1)
101+
for t in token.tokens:
102+
mask_token(t, in_groupby_orderby=False)
103+
else:
104+
mask_token(token, current_in_groupby_orderby)
105+
106+
# Process all tokens starting from the top-level statement
107+
_mask_group_tokens(parsed.tokens)
77108

78109
# Return the formatted masked query
79110
return str(parsed)
@@ -105,14 +136,33 @@ def mask_literals_with_sqlfluff(
105136
)
106137
return query
107138

108-
def replace_literals(segment):
109-
"""Recursively replace literals with placeholders."""
139+
def _is_ordinal_context(segment) -> bool:
140+
"""Check if the segment is a GROUP BY or ORDER BY clause."""
141+
return segment.is_type("groupby_clause", "orderby_clause")
142+
143+
def _is_integer_ordinal(segment) -> bool:
144+
"""Check if a numeric_literal segment is an integer ordinal reference.
145+
Only pure integers (no decimal point) are valid positional references."""
146+
return segment.is_type("numeric_literal") and segment.raw.isdigit()
147+
148+
def replace_literals(segment, in_groupby_orderby=False):
149+
"""Recursively replace literals with placeholders,
150+
preserving integer ordinal references in GROUP BY / ORDER BY."""
151+
# Detect GROUP BY / ORDER BY clause context
152+
if _is_ordinal_context(segment):
153+
in_groupby_orderby = True
154+
155+
# Skip masking integer positional references in GROUP BY / ORDER BY
156+
if in_groupby_orderby and _is_integer_ordinal(segment):
157+
return segment.raw
158+
110159
if segment.is_type("literal", "quoted_literal", "numeric_literal"):
111160
return MASK_TOKEN
112161
if segment.segments:
113162
# Recursively process sub-segments
114163
return "".join(
115-
replace_literals(sub_seg) for sub_seg in segment.segments
164+
replace_literals(sub_seg, in_groupby_orderby)
165+
for sub_seg in segment.segments
116166
)
117167
return segment.raw
118168

ingestion/tests/unit/lineage/masker/__init__.py

Whitespace-only changes.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
12+
"""
13+
Test helpers for SQL query masking tests.
14+
"""
15+
16+
from collate_sqllineage.core.parser.sqlfluff.analyzer import SqlFluffLineageAnalyzer
17+
from collate_sqllineage.core.parser.sqlglot.analyzer import SqlGlotLineageAnalyzer
18+
from collate_sqllineage.core.parser.sqlparse.analyzer import SqlParseLineageAnalyzer
19+
from collate_sqllineage.runner import LineageRunner
20+
21+
from metadata.ingestion.lineage.masker import mask_query, masked_query_cache
22+
23+
PARSER_MAP = {
24+
"SqlGlot": SqlGlotLineageAnalyzer,
25+
"SqlFluff": SqlFluffLineageAnalyzer,
26+
"SqlParse": SqlParseLineageAnalyzer,
27+
}
28+
29+
30+
def assert_masked_query(sql: str, masked_query: str, dialect: str, parser_name: str):
31+
"""
32+
Helper function to test query masking with a specific parser and assert the result.
33+
34+
:param sql: SQL statement to test
35+
:param masked_query: Expected masked query
36+
:param dialect: SQL dialect to use (for SqlGlot and SqlFluff)
37+
:param parser_name: Name of parser being tested (for error messages)
38+
"""
39+
analyzer_class = PARSER_MAP[parser_name]
40+
41+
parser_prefix = f"[{parser_name}] " if parser_name else ""
42+
43+
# clear cache before each test
44+
masked_query_cache.clear()
45+
46+
parser = LineageRunner(sql, dialect=dialect, analyzer=analyzer_class)
47+
len(parser.source_tables) # Force parsing
48+
49+
actual = mask_query(sql, dialect=dialect, parser=parser)
50+
expected = masked_query
51+
52+
assert actual == expected, (
53+
f"\n\t{parser_prefix}Expected Masked Query: {expected}"
54+
f"\n\t{parser_prefix}Actual Masked Query: {actual}"
55+
)

ingestion/tests/unit/lineage/test_query_masker.py renamed to ingestion/tests/unit/lineage/masker/test_query_masker.py

Lines changed: 26 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,14 @@
1010
# limitations under the License.
1111

1212
"""
13-
Query masking tests
13+
Query masking tests — core masking logic
1414
1515
Tests for masking SQL queries with different parsers (SqlGlot, SqlFluff, SqlParse).
16-
Since all parsers now use SqlParse for masking, they should produce identical output.
16+
Covers: parser dispatch, caching, literal types, ordinal preservation edge cases.
1717
"""
1818
from unittest import TestCase
1919

20-
import pytest
21-
22-
from ingestion.tests.unit.lineage.queries.helpers import assert_masked_query
20+
from ingestion.tests.unit.lineage.masker.helpers import assert_masked_query
2321
from metadata.ingestion.lineage.masker import mask_query, masked_query_cache
2422
from metadata.ingestion.lineage.models import Dialect
2523

@@ -290,174 +288,53 @@ def test_masking_when_no_parser_but_required(self):
290288

291289
assert masked_query is None
292290

293-
# Dialect specific query masking tests
294-
295-
def test_postgres_typed_literals_nested_subquery(self):
291+
def test_group_by_order_by_ordinal_positions_preserved(self):
296292
"""
297-
Test masking of Postgres typed literals in nested subqueries.
298-
"""
299-
query_test_cases = [
300-
{
301-
"query": "SELECT * FROM orders WHERE order_date = DATE '2023-10-01' AND customer_id IN (SELECT id FROM customers WHERE signup_date = TIMESTAMP '2022-01-15 10:30:00');", # noqa: E501
302-
"expected": "SELECT * FROM orders WHERE order_date = DATE ? AND customer_id IN (SELECT id FROM customers WHERE signup_date = TIMESTAMP ?);", # noqa: E501
303-
"dialect": Dialect.POSTGRES.value,
304-
}
305-
]
293+
Test that integer ordinal positions in GROUP BY and ORDER BY clauses
294+
are NOT masked, while other literals (WHERE, HAVING, LIMIT, OFFSET) still are.
306295
307-
for test_case in query_test_cases:
308-
assert_masked_query(
309-
test_case["query"],
310-
test_case["expected"],
311-
test_case["dialect"],
312-
"SqlGlot",
313-
)
314-
assert_masked_query(
315-
test_case["query"],
316-
test_case["expected"],
317-
test_case["dialect"],
318-
"SqlFluff",
319-
)
320-
assert_masked_query(
321-
test_case["query"],
322-
test_case["expected"],
323-
test_case["dialect"],
324-
"SqlParse",
325-
)
326-
327-
def test_mysql_implicit_typing_functions_limits_column_if(self):
328-
"""
329-
Test masking of MySQL implicit typing functions like IF, LIMIT.
296+
This addresses a reported issue where positional references like
297+
GROUP BY 1, 2, 3 were being incorrectly masked to GROUP BY ?, ?, ?
330298
"""
331299
query_test_cases = [
332300
{
333-
"query": "SELECT IF(status = 'active', 1, 0) AS is_active, DATE(created_at) AS created_day FROM accounts WHERE score > 99.5 AND created_at BETWEEN '2024-01-01' AND '2024-12-31' ORDER BY created_at DESC LIMIT 10 OFFSET 5;", # noqa: E501
334-
"expected": "SELECT IF(status = ?, ?, ?) AS is_active, DATE(created_at) AS created_day FROM accounts WHERE score > ? AND created_at BETWEEN ? AND ? ORDER BY created_at DESC LIMIT ? OFFSET ?;", # noqa: E501
335-
"dialect": Dialect.MYSQL.value,
336-
}
337-
]
338-
339-
for test_case in query_test_cases:
340-
assert_masked_query(
341-
test_case["query"],
342-
test_case["expected"],
343-
test_case["dialect"],
344-
"SqlGlot",
345-
)
346-
assert_masked_query(
347-
test_case["query"],
348-
test_case["expected"],
349-
test_case["dialect"],
350-
"SqlFluff",
351-
)
352-
assert_masked_query(
353-
test_case["query"],
354-
test_case["expected"],
355-
test_case["dialect"],
356-
"SqlParse",
357-
)
358-
359-
def test_bigquery_struct_array_unnest(self):
360-
"""
361-
Test masking of BigQuery STRUCTs, ARRAYs, and UNNEST.
362-
"""
363-
query_test_cases = [
301+
"query": "SELECT a, b, c FROM t WHERE x > 5 GROUP BY 1, 2, 3 HAVING COUNT(*) > 1 ORDER BY 1 ASC LIMIT 500 OFFSET 0;", # noqa: E501
302+
"expected": "SELECT a, b, c FROM t WHERE x > ? GROUP BY 1, 2, 3 HAVING COUNT(*) > ? ORDER BY 1 ASC LIMIT ? OFFSET ?;", # noqa: E501
303+
"dialect": Dialect.ANSI.value,
304+
},
364305
{
365-
"query": "SELECT u.name, u.age, a.city FROM UNNEST([STRUCT('alice' AS name, 25 AS age, [STRUCT('NY' AS city)])]) AS u, UNNEST(u.f2) AS a WHERE u.age > 21 AND a.city = 'NY';", # noqa: E501
366-
"expected": "SELECT u.name, u.age, a.city FROM UNNEST([STRUCT(? AS name, ? AS age, [STRUCT(? AS city)])]) AS u, UNNEST(u.f2) AS a WHERE u.age > ? AND a.city = ?;", # noqa: E501
367-
"dialect": Dialect.BIGQUERY.value,
306+
# Mixed column names and positional integers in GROUP BY / ORDER BY
307+
"query": "SELECT a, b FROM t GROUP BY a, 2 ORDER BY b, 1 DESC;",
308+
"expected": "SELECT a, b FROM t GROUP BY a, 2 ORDER BY b, 1 DESC;",
309+
"dialect": Dialect.ANSI.value,
368310
},
369-
]
370-
371-
for test_case in query_test_cases:
372-
# TODO: Not masking `'NY'` inside STRUCT whereas `'alice'` and `25` are masked, need to validate
373-
# assert_masked_query(
374-
# test_case["query"],
375-
# test_case["expected"],
376-
# test_case["dialect"],
377-
# "SqlGlot",
378-
# )
379-
assert_masked_query(
380-
test_case["query"],
381-
test_case["expected"],
382-
test_case["dialect"],
383-
"SqlFluff",
384-
)
385-
# TODO: Not masking `'NY'` inside STRUCT whereas `'alice'` and `25` are masked, need to validate
386-
# assert_masked_query(
387-
# test_case["query"],
388-
# test_case["expected"],
389-
# test_case["dialect"],
390-
# "SqlParse",
391-
# )
392-
393-
def test_snowflake_variant_json_casting(self):
394-
"""
395-
Test masking of Snowflake VARIANT/JSON data types and casting.
396-
"""
397-
query_test_cases = [
398311
{
399-
"query": "SELECT data:id AS user_id, data:profile.name AS user_name, data:profile.age::INT AS user_age FROM events WHERE data:profile.age > 30 AND data:profile.status = 'active';", # noqa: E501
400-
"expected": "SELECT data:id AS user_id, data:profile.name AS user_name, data:profile.age::INT AS user_age FROM events WHERE data:profile.age > ? AND data:profile.status = ?;", # noqa: E501
401-
"dialect": Dialect.SNOWFLAKE.value,
402-
}
403-
]
404-
405-
for test_case in query_test_cases:
406-
assert_masked_query(
407-
test_case["query"],
408-
test_case["expected"],
409-
test_case["dialect"],
410-
"SqlGlot",
411-
)
412-
assert_masked_query(
413-
test_case["query"],
414-
test_case["expected"],
415-
test_case["dialect"],
416-
"SqlFluff",
417-
)
418-
assert_masked_query(
419-
test_case["query"],
420-
test_case["expected"],
421-
test_case["dialect"],
422-
"SqlParse",
423-
)
424-
425-
@pytest.mark.skip(
426-
reason="SqlGlot and SqlFluff do not support DECLARE statement type yet."
427-
" Additionally multi-statement handling needs to be evaluated later."
428-
)
429-
def test_tsql_variables_convert(self):
430-
"""
431-
Test masking of T-SQL variables and CONVERT function.
432-
"""
433-
query_test_cases = [
312+
# CTE with GROUP BY positional references (similar to reported Payoneer query)
313+
"query": "WITH cte AS (SELECT a FROM t WHERE x = 'val' GROUP BY 1, 2, 3 HAVING COUNT(*) > 1 ORDER BY 1 ASC) SELECT * FROM cte LIMIT 500 OFFSET 0;", # noqa: E501
314+
"expected": "WITH cte AS (SELECT a FROM t WHERE x = ? GROUP BY 1, 2, 3 HAVING COUNT(*) > ? ORDER BY 1 ASC) SELECT * FROM cte LIMIT ? OFFSET ?;", # noqa: E501
315+
"dialect": Dialect.ANSI.value,
316+
},
434317
{
435-
"query": "DECLARE @startDate DATETIME = '2024-01-01'; DECLARE @endDate DATETIME = '2024-12-31'; SELECT * FROM events WHERE event_date BETWEEN @startDate AND @endDate;", # noqa: E501
436-
"expected": "DECLARE @startDate DATETIME = '2024-01-01'; DECLARE @endDate DATETIME = '2024-12-31'; SELECT * FROM events WHERE event_date BETWEEN ? AND ?;", # noqa: E501
437-
"dialect": Dialect.TSQL.value,
438-
}
318+
# BigQuery dialect with GROUP BY positional references
319+
"query": "SELECT full_name, COUNT(*) AS rn FROM t WHERE role = 'admin' AND dept IN ('a', 'b') GROUP BY 1, 2, 3, 4, 5, 6 HAVING COUNT(*) > 1 ORDER BY 1 ASC LIMIT 500 OFFSET 0;", # noqa: E501
320+
"expected": "SELECT full_name, COUNT(*) AS rn FROM t WHERE role = ? AND dept IN (?, ?) GROUP BY 1, 2, 3, 4, 5, 6 HAVING COUNT(*) > ? ORDER BY 1 ASC LIMIT ? OFFSET ?;", # noqa: E501
321+
"dialect": Dialect.BIGQUERY.value,
322+
},
439323
]
440324

441325
for test_case in query_test_cases:
442-
# TODO: sqlglot doesn't support analyzing statement type [declare] for
443-
# SQL: DECLARE @startDate DATETIME = '2024-01-01';
444326
assert_masked_query(
445327
test_case["query"],
446328
test_case["expected"],
447329
test_case["dialect"],
448330
"SqlGlot",
449331
)
450-
# TODO: sqlfluff doesn't support analyzing statement type [declare] for
451-
# SQL: DECLARE @startDate DATETIME = '2024-01-01';
452332
assert_masked_query(
453333
test_case["query"],
454334
test_case["expected"],
455335
test_case["dialect"],
456336
"SqlFluff",
457337
)
458-
# TODO: since our parser is designed to handle one sql statement at a
459-
# time, it returns last masked statement only, need to evaluate later
460-
# if multi-statement handling is required
461338
assert_masked_query(
462339
test_case["query"],
463340
test_case["expected"],

0 commit comments

Comments
 (0)