Skip to content

Commit 2c03620

Browse files
committed
Fix: Optimise redshift memory usage (#25904)
1 parent 1ce262c commit 2c03620

2 files changed

Lines changed: 53 additions & 19 deletions

File tree

ingestion/src/metadata/ingestion/source/database/redshift/metadata.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -190,27 +190,47 @@ def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]:
190190
(self.context.get().database, schema_name, table_name)
191191
)
192192

193-
def get_partition_details(self) -> None:
193+
def get_partition_details(self, schema_name: Optional[str] = None) -> None:
194194
"""
195-
Populate partition details
195+
Populate partition details for the given schema (or all schemas if None).
196196
"""
197197
try:
198198
self.partition_details.clear()
199-
results = self.connection.execute(
200-
statement=REDSHIFT_PARTITION_DETAILS
201-
).fetchall()
199+
query = REDSHIFT_PARTITION_DETAILS
200+
if schema_name:
201+
query += f" AND \"schema\" = '{schema_name}'"
202+
results = self.connection.execute(statement=query).fetchall()
202203
for row in results:
203204
self.partition_details[f"{row.schema}.{row.table}"] = row.diststyle
204205
except Exception as exe:
205206
logger.debug(traceback.format_exc())
206207
logger.debug(f"Failed to fetch partition details due: {exe}")
207208

209+
def _clear_reflection_cache(self) -> None:
210+
"""Clear the SQLAlchemy inspector's info_cache to release
211+
cached column / relation data from prior schemas.
212+
213+
This prevents unbounded memory growth when ingesting many
214+
schemas, since _get_schema_column_info, get_columns, and
215+
_get_all_relation_info all use @reflection.cache.
216+
"""
217+
try:
218+
if hasattr(self.inspector, "info_cache"):
219+
self.inspector.info_cache.clear()
220+
except Exception as exc:
221+
logger.debug(f"Failed to clear reflection cache: {exc}")
222+
208223
def query_table_names_and_types(
209224
self, schema_name: str
210225
) -> Iterable[TableNameAndType]:
211226
"""
212227
Handle custom table types
213228
"""
229+
# Clear cached column / relation data from prior schemas to
230+
# prevent unbounded memory growth (issue #20649)
231+
self._clear_reflection_cache()
232+
233+
self.get_partition_details(schema_name)
214234
self._set_constraint_details(schema_name)
215235

216236
result = self.connection.execute(
@@ -305,7 +325,6 @@ def set_external_location_map(self, database_name: str) -> None:
305325
def get_database_names(self) -> Iterable[str]:
306326
if not self.config.serviceConnection.root.config.ingestAllDatabases:
307327
configured_db = self.config.serviceConnection.root.config.database
308-
self.get_partition_details()
309328
self._set_incremental_table_processor(configured_db)
310329
self.set_external_location_map(configured_db)
311330
yield configured_db
@@ -331,7 +350,6 @@ def get_database_names(self) -> Iterable[str]:
331350

332351
try:
333352
self.set_inspector(database_name=new_database)
334-
self.get_partition_details()
335353
self._set_incremental_table_processor(new_database)
336354
self.set_external_location_map(new_database)
337355
yield new_database

ingestion/src/metadata/ingestion/source/database/redshift/utils.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151

5252
# pylint: disable=protected-access
5353
@calculate_execution_time()
54-
@reflection.cache
5554
def get_columns(self, connection, table_name, schema=None, **kw):
5655
"""
5756
Return information about columns in `table_name`.
@@ -61,6 +60,10 @@ def get_columns(self, connection, table_name, schema=None, **kw):
6160
6261
overriding the default dialect method to include the
6362
distkey and sortkey info
63+
64+
Note: @reflection.cache removed to avoid unbounded memory growth
65+
across schemas (issue #20649). The underlying
66+
_get_schema_column_info already caches per-schema.
6467
"""
6568
cols = self._get_redshift_columns(connection, table_name, schema, **kw)
6669
if not self._domains:
@@ -121,22 +124,21 @@ def _get_column_info(self, *args, **kwargs):
121124

122125

123126
@calculate_execution_time()
124-
@reflection.cache
125127
def _get_schema_column_info(
126128
self, connection, schema=None, **kw
127129
): # pylint: disable=unused-argument
128130
"""
129131
Get schema column info
130132
131-
Args:
132-
connection:
133-
schema:
134-
**kw:
135-
Returns:
136-
137-
This method is responsible for fetching all the column details like
138-
name, type, constraints, distkey and sortkey etc.
133+
Uses a custom single-schema cache instead of @reflection.cache
134+
to prevent unbounded memory growth across schemas (issue #20649).
135+
Only the most recently requested schema's data is retained.
139136
"""
137+
# Single-schema cache: invalidate when schema changes
138+
cached = getattr(self, "_schema_col_cache", None)
139+
if cached is not None and cached[0] == schema:
140+
return cached[1]
141+
140142
schema_clause = f"AND schema = '{schema if schema else ''}'"
141143
all_columns = defaultdict(list)
142144
result = connection.execute(
@@ -145,7 +147,10 @@ def _get_schema_column_info(
145147
for col in result:
146148
key = RelationKey(col.table_name, col.schema, connection)
147149
all_columns[key].append(col)
148-
return dict(all_columns)
150+
result.close()
151+
data = dict(all_columns)
152+
self._schema_col_cache = (schema, data)
153+
return data
149154

150155

151156
def _handle_array_type(attype):
@@ -377,10 +382,19 @@ def get_table_comment(
377382

378383

379384
@calculate_execution_time()
380-
@reflection.cache
381385
def _get_all_relation_info(self, connection, **kw): # pylint: disable=unused-argument
386+
"""
387+
Uses a custom single-schema cache instead of @reflection.cache
388+
to prevent unbounded memory growth across schemas (issue #20649).
389+
"""
382390
# pylint: disable=consider-using-f-string
383391
schema = kw.get("schema", None)
392+
393+
# Single-schema cache: invalidate when schema changes
394+
cached = getattr(self, "_relation_info_cache", None)
395+
if cached is not None and cached[0] == schema:
396+
return cached[1]
397+
384398
schema_clause = "AND schema = '{schema}'".format(schema=schema) if schema else ""
385399

386400
table_name = kw.get("table_name", None)
@@ -399,6 +413,8 @@ def _get_all_relation_info(self, connection, **kw): # pylint: disable=unused-ar
399413
for rel in result:
400414
key = RelationKey(rel.relname, rel.schema, connection)
401415
relations[key] = rel
416+
result.close()
417+
self._relation_info_cache = (schema, relations)
402418
return relations
403419

404420

0 commit comments

Comments
 (0)