Skip to content

Commit 144bf72

Browse files
authored
perf: Hoist table_metadata at remaining repeat-access (#3301)
## Summary Follow-up to #2674. `Transaction.table_metadata` replays all staged updates via `model_copy(deep=True)` on every access, so reading it (or `spec()`/`schema()` derived from it) repeatedly within a single snapshot-producer method is redundant deep-copy work. \#2674 hoisted the property access in `_summary()`; this PR extends the same pattern to three more call sites in `pyiceberg/table/update/snapshot.py` that still read the property more than once per invocation. ## Changes - `_SnapshotProducer._summary`: hoist `spec()`/`schema()` out of the per-data-file loop (they are invariant across files; still called 2× per file before this change) - `_DeleteFiles._compute_deletes`: hoist `table_metadata`/`schema` once at method entry (was 3 accesses — two via `self.schema()` for the metrics evaluators and one direct for `snapshot_by_id`) - `_MergeAppendFiles.__init__`: 3 consecutive `self._transaction.table_metadata.properties` accesses → 1 All hoists are at method entry. Nothing inside these methods stages a transaction update (the `AddSnapshotUpdate` is staged by the caller after `_commit()` returns), so `table_metadata` is invariant for the duration of each method. Not touched here: the `new_manifest_writer(self.spec(id))` calls inside per-manifest loops in `_write_delete_manifest` / `_compute_deletes` / `_OverwriteFiles._existing_manifests` also trigger 2–3 property accesses per iteration via the `schema()`/`spec()`/`new_manifest_writer()` helpers. Those loops are O(partition-groups or rewritten-manifests) rather than O(files), and fixing them cleanly would mean changing the helper signatures — happy to do that in a follow-up if there's interest. ## Testing New `test_snapshot_producer_bounded_metadata_access` wraps `Transaction.table_metadata` with a call counter and asserts: - `_summary()` access count is identical for 10 vs 100 appended files (independent of N), and ≤ 2 - `_MergeAppendFiles.__init__` makes exactly 1 more access than `_FastAppendFiles.__init__` (was 3 before this change — verified the test fails with the production diff reverted) The test constructs `_FastAppendFiles` / `_MergeAppendFiles` directly rather than going through the public append path, since the public path writes manifest avro files; the property-access count it measures is the behaviour under test and doesn't require I/O. Existing `tests/table/test_snapshots.py` passing. ## Motivation For appends/deletes/overwrites touching large numbers of files or manifests, the per-iteration property access dominates wall-clock (each access replays the staged-updates list through pydantic `model_copy`). This keeps the cost constant per method call. --------- Co-authored-by: Ruiyang Wang <rynewang@users.noreply.github.com>
1 parent 842d01c commit 144bf72

2 files changed

Lines changed: 58 additions & 11 deletions

File tree

pyiceberg/table/update/snapshot.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
228228

229229
# avoid copying metadata for each data file
230230
table_metadata = self._transaction.table_metadata
231+
schema = table_metadata.schema()
232+
default_spec = table_metadata.spec()
231233

232234
partition_summary_limit = int(
233235
table_metadata.properties.get(
@@ -239,8 +241,8 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
239241
for data_file in self._added_data_files:
240242
ssc.add_file(
241243
data_file=data_file,
242-
partition_spec=table_metadata.spec(),
243-
schema=table_metadata.schema(),
244+
partition_spec=default_spec,
245+
schema=schema,
244246
)
245247

246248
if len(self._deleted_data_files) > 0:
@@ -249,7 +251,7 @@ def _summary(self, snapshot_properties: dict[str, str] = EMPTY_DICT) -> Summary:
249251
ssc.remove_file(
250252
data_file=data_file,
251253
partition_spec=specs[data_file.spec_id],
252-
schema=table_metadata.schema(),
254+
schema=schema,
253255
)
254256

255257
previous_snapshot = (
@@ -424,12 +426,14 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
424426
data_file=entry.data_file,
425427
)
426428

429+
# avoid copying metadata for each evaluator
430+
table_metadata = self._transaction.table_metadata
431+
schema = table_metadata.schema()
432+
427433
manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
428-
strict_metrics_evaluator = _StrictMetricsEvaluator(
429-
self.schema(), self._predicate, case_sensitive=self._case_sensitive
430-
).eval
434+
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval
431435
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
432-
self.schema(), self._predicate, case_sensitive=self._case_sensitive
436+
schema, self._predicate, case_sensitive=self._case_sensitive
433437
).eval
434438

435439
existing_manifests = []
@@ -441,7 +445,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
441445
# Should be the current tip of the _target_branch
442446
parent_snapshot_id_for_delete_source = self._parent_snapshot_id
443447
if parent_snapshot_id_for_delete_source is not None:
444-
snapshot = self._transaction.table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source)
448+
snapshot = table_metadata.snapshot_by_id(parent_snapshot_id_for_delete_source)
445449
if snapshot: # Ensure snapshot is found
446450
for manifest_file in snapshot.manifests(io=self._io):
447451
if manifest_file.content == ManifestContent.DATA:
@@ -542,18 +546,19 @@ def __init__(
542546
from pyiceberg.table import TableProperties
543547

544548
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch)
549+
table_properties = self._transaction.table_metadata.properties
545550
self._target_size_bytes = property_as_int(
546-
self._transaction.table_metadata.properties,
551+
table_properties,
547552
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
548553
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
549554
) # type: ignore
550555
self._min_count_to_merge = property_as_int(
551-
self._transaction.table_metadata.properties,
556+
table_properties,
552557
TableProperties.MANIFEST_MIN_MERGE_COUNT,
553558
TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT,
554559
) # type: ignore
555560
self._merge_enabled = property_as_bool(
556-
self._transaction.table_metadata.properties,
561+
table_properties,
557562
TableProperties.MANIFEST_MERGE_ENABLED,
558563
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
559564
)

tests/table/test_snapshots.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,3 +551,45 @@ def test_latest_ancestor_before_timestamp() -> None:
551551

552552
result = latest_ancestor_before_timestamp(metadata, 1000)
553553
assert result is None
554+
555+
556+
def test_snapshot_producer_bounded_metadata_access(table_v2: Table) -> None:
557+
"""Transaction.table_metadata replays staged updates via update_table_metadata on
558+
every access, so the snapshot producer must not read it once per item. Guards the
559+
hoisting introduced in #2674 and extended here.
560+
"""
561+
from unittest import mock
562+
563+
from pyiceberg.table.update import update_table_metadata
564+
from pyiceberg.table.update.snapshot import _FastAppendFiles, _MergeAppendFiles
565+
566+
def make_file() -> DataFile:
567+
return DataFile.from_args(content=DataFileContent.DATA, record_count=1, file_size_in_bytes=1, partition=Record())
568+
569+
txn = table_v2.transaction()
570+
571+
with mock.patch("pyiceberg.table.update_table_metadata", wraps=update_table_metadata) as spy:
572+
# _summary() cost must not scale with the number of data files
573+
def summary_calls(n_files: int) -> int:
574+
append = _FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io)
575+
for _ in range(n_files):
576+
append.append_data_file(make_file())
577+
spy.reset_mock()
578+
append._summary()
579+
return spy.call_count
580+
581+
few, many = summary_calls(10), summary_calls(100)
582+
assert few == many, f"_summary() update_table_metadata calls scale with file count ({few} vs {many})"
583+
assert many <= 2, f"_summary() triggered {many} update_table_metadata calls; expected O(1)"
584+
585+
# _MergeAppendFiles.__init__ should add exactly one call over _FastAppendFiles.__init__
586+
spy.reset_mock()
587+
_FastAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io)
588+
fast_init = spy.call_count
589+
spy.reset_mock()
590+
_MergeAppendFiles(operation=Operation.APPEND, transaction=txn, io=table_v2.io)
591+
merge_init = spy.call_count
592+
assert merge_init - fast_init == 1, (
593+
f"_MergeAppendFiles.__init__ made {merge_init - fast_init} extra update_table_metadata "
594+
"calls over its superclass; expected 1 (hoisted)"
595+
)

0 commit comments

Comments
 (0)