Add managed-memory advise, prefetch, and discard-prefetch free functions#1775
Add managed-memory advise, prefetch, and discard-prefetch free functions#1775rparolin wants to merge 91 commits into
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
|
|
/ok to test |
|
question: Does making these member functions of the |
I'm moving this back into draft. We discussed in our team meeting because I was already hesitant as Buffer is becoming a 'God object' with the functionality is gaining. We were going to explore alternatives. Free functions sounds like a good alternative to explore. |
…ns in the cuda.core.managed_memory namespace
…ups, fix docs - Remove duplicate long-form "cu_mem_advise_*" string aliases from _MANAGED_ADVICE_ALIASES; users pass short strings or the enum directly - Replace 4 boolean allow_* params in _normalize_managed_location with a single allowed_loctypes frozenset driven by _MANAGED_ADVICE_ALLOWED_LOCTYPES - Cache immutable runtime checks: CU_DEVICE_CPU, v2 bindings flag, discard_prefetch support, and advice enum-to-alias reverse map - Collapse hasattr+getattr to single getattr in _managed_location_enum - Move _require_managed_discard_prefetch_support to top of discard_prefetch for fail-fast behavior - Fix docs build: reset Sphinx module scope after managed_memory section in api.rst so subsequent sections resolve under cuda.core - Add discard_prefetch pool-allocation test and comment on _get_mem_range_attr Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e legacy path The _V2_BINDINGS cache in _buffer.pyx persists across tests, so monkeypatching get_binding_version alone is insufficient when earlier tests have already populated the cache with the v2 value. Promote _V2_BINDINGS from cdef int to a Python-level variable so tests can monkeypatch it directly via monkeypatch.setattr, and reset it to -1 in both legacy-signature tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…t real hardware These three tests call cuMemAdvise on real CUDA devices and verify memory range attributes. On devices without concurrent_managed_access (e.g. Windows/WDDM), set_read_mostly silently no-ops and set_preferred_location fails with CUDA_ERROR_INVALID_DEVICE. Use the stricter _skip_if_managed_location_ops_unsupported guard, matching the pattern already used by test_managed_memory_functions_accept_raw_pointer_ranges. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…s support Reorder checks in discard_prefetch so _normalize_managed_target_range runs before _require_managed_discard_prefetch_support. This ensures non-managed buffers raise ValueError before the RuntimeError for missing cuMemDiscardAndPrefetchBatchAsync support. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ps module Move advise, prefetch, and discard_prefetch functions and their helpers out of _buffer.pyx into a new _managed_memory_ops Cython module to improve separation of concerns. Expose _init_mem_attrs and _query_memory_attrs as non-inline cdef functions in _buffer.pxd so the new module can reuse them. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…m_advise_prefetch # Conflicts: # cuda_core/docs/source/release/1.0.0-notes.rst
- ManagedMemoryResource.allocate: require explicit stream (kw-only), matching the post-NVIDIA#2020 convention across stream-scheduling APIs. - Batch free functions (discard_batch / prefetch_batch / discard_prefetch_batch): move stream to the first positional argument to mirror launch(stream, ...); add full type annotations. - Host: drop the redundant __setattr__ guard now that __slots__ alone enforces immutability. - test_managed_ops.py: extract memory_pool / location_ops / discard_prefetch fixture tiers, eliminating the device + mr + buffer preamble previously copied across most tests. - test_accessed_by_*: replace the hand-rolled MutableSet pass with helpers.collection_interface_testers.assert_single_member_mutable_set_interface introduced by NVIDIA#2018.
Captures the stream-argument shape (kw-only required vs launch-style positional), the __slots__-only immutability convention, and the pytest-fixture / helper-reuse expectations established while addressing PR NVIDIA#1775 review feedback. Future agents should hit these guardrails before writing code that recreates the same issues.
Closes the asymmetry leofang flagged: numa_id was a constructor arg but is_numa_current was only reachable via Host.numa_current(). Both state fields are now uniformly settable through the constructor; Host.numa_current() becomes a thin alias. The two are mutually exclusive — passing both raises ValueError.
…m_advise_prefetch
…m_advise_prefetch
e996d7b to
44b0470
Compare
Per PR NVIDIA#1775 review feedback. The annotation is redundant since other will always be a Python object; the IDE/typecheckers don't need it spelled out and the convention in this codebase is to leave it implicit.
…ged_buffer Per PR NVIDIA#1775 review feedback. The deferred import was avoiding a hypothetical circular dependency that doesn't exist; lifting to module top removes the per-call import overhead.
Per PR NVIDIA#1775 review feedback. The deferred import was avoiding a hypothetical circular dependency that doesn't exist; lifting it to module top removes the per-call import overhead.
Per PR NVIDIA#1775 review feedback. The two deferred imports inside _decode_location were avoiding a hypothetical circular dependency that doesn't exist; lifting to module top removes the per-call import overhead.
…ource Per PR NVIDIA#1775 review feedback. The deferred import comment claimed a circular-dependency concern but the cycle did not actually exist; lifting to module top removes the per-allocation import overhead.
Per PR NVIDIA#1775 review feedback: this PR targets the cuda.core v1.1.0 milestone, so the new-feature entries belong in a new 1.1.0-notes.rst rather than being appended to the already-released 1.0.0 notes. The release index uses a glob pattern (release/*-notes), so the new file is picked up automatically.
… bindings only Per PR NVIDIA#1775 review feedback (leofang). The cu13 paths in _managed_buffer.preferred_location and _reject_numa_host_on_cuda12 gated on binding_version() >= (13, 0, 0) only, mirroring the regressions that PR NVIDIA#2054 and PR NVIDIA#2064 fixed for other modules. With cuda-bindings 13.x installed but a CUDA 12.x runtime driver, the v2 CU_MEM_RANGE_ATTRIBUTE_PREFERRED_LOCATION_* attributes and the HOST_NUMA{,_CURRENT} location types fail deep in the driver with CUDA_ERROR_INVALID_VALUE. Both checks now AND driver_version() >= (13, 0, 0) into the gate, and the _reject_numa_host_on_cuda12 error message names both required versions.
leofang
left a comment
There was a problem hiding this comment.
Approved, assuming all pending comments below will be either tracked or followed up.
| cdef dict _MANAGED_ADVICE_ALIASES = { | ||
| "set_read_mostly": "CU_MEM_ADVISE_SET_READ_MOSTLY", | ||
| "unset_read_mostly": "CU_MEM_ADVISE_UNSET_READ_MOSTLY", | ||
| "set_preferred_location": "CU_MEM_ADVISE_SET_PREFERRED_LOCATION", | ||
| "unset_preferred_location": "CU_MEM_ADVISE_UNSET_PREFERRED_LOCATION", | ||
| "set_accessed_by": "CU_MEM_ADVISE_SET_ACCESSED_BY", | ||
| "unset_accessed_by": "CU_MEM_ADVISE_UNSET_ACCESSED_BY", | ||
| } | ||
|
|
||
| cdef frozenset _MANAGED_ADVICE_IGNORE_LOCATION = frozenset(( | ||
| "set_read_mostly", | ||
| "unset_read_mostly", | ||
| "unset_preferred_location", | ||
| )) | ||
|
|
||
| cdef frozenset _ALL_LOCATION_TYPES = frozenset(("device", "host", "host_numa", "host_numa_current")) | ||
| cdef frozenset _DEVICE_HOST_NUMA = frozenset(("device", "host", "host_numa")) | ||
| cdef frozenset _DEVICE_HOST_ONLY = frozenset(("device", "host")) | ||
|
|
||
| cdef dict _MANAGED_ADVICE_ALLOWED_LOCTYPES = { | ||
| "set_read_mostly": _DEVICE_HOST_NUMA, | ||
| "unset_read_mostly": _DEVICE_HOST_NUMA, | ||
| "set_preferred_location": _ALL_LOCATION_TYPES, | ||
| "unset_preferred_location": _DEVICE_HOST_NUMA, | ||
| "set_accessed_by": _DEVICE_HOST_ONLY, | ||
| "unset_accessed_by": _DEVICE_HOST_ONLY, | ||
| } | ||
|
|
||
| # Reverse lookup: enum value → alias. Built once at module load. | ||
| cdef dict _ADVICE_ENUM_TO_ALIAS = { | ||
| getattr(driver.CUmem_advise, attr_name): alias | ||
| for alias, attr_name in _MANAGED_ADVICE_ALIASES.items() | ||
| if hasattr(driver.CUmem_advise, attr_name) | ||
| } | ||
|
|
||
|
|
||
| cdef tuple _normalize_managed_advice(object advice): | ||
| cdef str alias | ||
| cdef str attr_name | ||
| if isinstance(advice, str): | ||
| alias = advice.lower() | ||
| attr_name = _MANAGED_ADVICE_ALIASES.get(alias) | ||
| if attr_name is None: | ||
| raise ValueError( | ||
| "advice must be one of " | ||
| f"{tuple(sorted(_MANAGED_ADVICE_ALIASES))!r}, got {advice!r}" | ||
| ) | ||
| return alias, getattr(driver.CUmem_advise, attr_name) | ||
|
|
||
| if isinstance(advice, driver.CUmem_advise): | ||
| alias = _ADVICE_ENUM_TO_ALIAS.get(advice) | ||
| if alias is None: | ||
| raise ValueError(f"Unsupported advice value: {advice!r}") | ||
| return alias, advice | ||
|
|
||
| raise TypeError( | ||
| "advice must be a cuda.bindings.driver.CUmem_advise value or a supported string alias" | ||
| ) | ||
|
|
||
|
|
||
| cdef void _require_managed_buffer(Buffer self, str what): | ||
| # Buffer.is_managed handles both pointer-attribute and memory-resource | ||
| # paths (e.g. pool-allocated managed memory whose pointer attribute |
There was a problem hiding this comment.
This block (_MANAGED_ADVICE_ALIASES dict, _ADVICE_ENUM_TO_ALIAS reverse dict, and _normalize_managed_advice function, ~60 lines) appears to be dead code in the current design.
Since advise is now exclusively exposed through ManagedBuffer property setters (buf.read_mostly = True, buf.preferred_location = Device(0), buf.accessed_by.add(...)), all calls to _advise_one pass the CUmem_advise enum value directly. The string-alias lookup path (isinstance(advice, str) → dict lookup → attribute resolution) is never exercised by any public API.
If string advice names aren't part of the API contract going forward, this code should be simplified to only handle the enum path — or removed entirely.
| plain.close() | ||
|
|
||
|
|
||
| class TestHost: |
There was a problem hiding this comment.
This test should be moved to its own file (like Device)
| try: | ||
| from cuda.bindings import driver | ||
| except ImportError: | ||
| from cuda import cuda as driver |
There was a problem hiding this comment.
The fact that these are needed at test time rings a bell. cuda.core tries hard to not leak the abstraction. This highlights a problem that we do not expose enough mem-range attributes for ManagedBuffer. We should have a follow-up PR on this.
| def test_managed_memory_prefetch_supports_managed_pool_allocations(memory_pool_device, memory_pool_mr): | ||
| device = memory_pool_device | ||
| buffer = memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) | ||
| stream = device.create_stream() | ||
|
|
||
| buffer.prefetch(Host(), stream=stream) | ||
| stream.sync() | ||
| last_location = _get_int_attr( | ||
| buffer, | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION, | ||
| ) | ||
| assert last_location == _HOST_LOCATION_ID | ||
|
|
||
| buffer.prefetch(device, stream=stream) | ||
| stream.sync() | ||
| last_location = _get_int_attr( | ||
| buffer, | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION, | ||
| ) | ||
| assert last_location == device.device_id | ||
|
|
||
| buffer.close() | ||
|
|
||
|
|
||
| def test_managed_memory_advise_supports_external_managed_allocations(location_ops_device): | ||
| plain = DummyUnifiedMemoryResource(location_ops_device).allocate(_MANAGED_TEST_ALLOCATION_SIZE) | ||
| buffer = ManagedBuffer.from_handle(plain.handle, plain.size, owner=plain) | ||
|
|
||
| buffer.read_mostly = True | ||
| assert ( | ||
| _get_int_attr( | ||
| buffer, | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_READ_MOSTLY, | ||
| ) | ||
| == _READ_MOSTLY_ENABLED | ||
| ) | ||
|
|
||
| buffer.preferred_location = Host() | ||
| preferred_location = _get_int_attr( | ||
| buffer, | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_PREFERRED_LOCATION, | ||
| ) | ||
| assert preferred_location == _HOST_LOCATION_ID | ||
|
|
||
| plain.close() | ||
|
|
||
|
|
||
| def test_managed_memory_prefetch_supports_external_managed_allocations(location_ops_device): | ||
| plain = DummyUnifiedMemoryResource(location_ops_device).allocate(_MANAGED_TEST_ALLOCATION_SIZE) | ||
| buffer = ManagedBuffer.from_handle(plain.handle, plain.size, owner=plain) | ||
| stream = location_ops_device.create_stream() | ||
|
|
||
| buffer.prefetch(location_ops_device, stream=stream) | ||
| stream.sync() | ||
|
|
||
| last_location = _get_int_attr( | ||
| buffer, | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION, | ||
| ) | ||
| assert last_location == location_ops_device.device_id | ||
|
|
||
| plain.close() | ||
|
|
||
|
|
||
| def test_managed_memory_discard_prefetch_supports_managed_pool_allocations( | ||
| discard_prefetch_device, discard_prefetch_buffer | ||
| ): | ||
| device = discard_prefetch_device | ||
| buffer = discard_prefetch_buffer | ||
| stream = device.create_stream() | ||
|
|
||
| buffer.prefetch(Host(), stream=stream) | ||
| stream.sync() | ||
|
|
||
| buffer.discard_prefetch(device, stream=stream) | ||
| stream.sync() | ||
|
|
||
| last_location = _get_int_attr( | ||
| buffer, | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION, | ||
| ) | ||
| assert last_location == device.device_id | ||
|
|
||
|
|
||
| def test_managed_memory_discard_prefetch_supports_external_managed_allocations(discard_prefetch_device): | ||
| device = discard_prefetch_device | ||
| plain = DummyUnifiedMemoryResource(device).allocate(_MANAGED_TEST_ALLOCATION_SIZE) | ||
| buffer = ManagedBuffer.from_handle(plain.handle, plain.size, owner=plain) | ||
| stream = device.create_stream() | ||
|
|
||
| buffer.prefetch(Host(), stream=stream) | ||
| stream.sync() | ||
|
|
||
| buffer.discard_prefetch(device, stream=stream) | ||
| stream.sync() | ||
|
|
||
| last_location = _get_int_attr( | ||
| buffer, | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION, | ||
| ) | ||
| assert last_location == device.device_id | ||
|
|
||
| plain.close() | ||
|
|
||
|
|
||
| def test_managed_memory_operations_reject_non_managed_allocations(init_cuda): | ||
| """Wrapping a non-managed pointer in ManagedBuffer raises at op time.""" | ||
| device = Device() | ||
| device.set_current() | ||
|
|
||
| plain = DummyDeviceMemoryResource(device).allocate(_MANAGED_TEST_ALLOCATION_SIZE) | ||
| # Wrapping a device-only pointer as ManagedBuffer is allowed at construction | ||
| # (no driver query yet); the runtime managed-ness check fires at op time. | ||
| buffer = ManagedBuffer.from_handle(plain.handle, plain.size, owner=plain) | ||
| stream = device.create_stream() | ||
|
|
||
| with pytest.raises(ValueError, match="managed-memory allocation"): | ||
| buffer.read_mostly = True | ||
| with pytest.raises(ValueError, match="managed-memory allocation"): | ||
| buffer.prefetch(device, stream=stream) | ||
| with pytest.raises(ValueError, match="managed-memory allocation"): | ||
| buffer.discard_prefetch(device, stream=stream) | ||
|
|
||
| plain.close() | ||
|
|
||
|
|
||
| def test_managed_memory_operation_validation(memory_pool_device, memory_pool_mr): | ||
| device = memory_pool_device | ||
| buffer = memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) | ||
| stream = device.create_stream() | ||
|
|
||
| with pytest.raises(ValueError, match="location is required"): | ||
| buffer.prefetch(None, stream=stream) | ||
|
|
||
| # CUDA 13: kind-allowed check fires (ValueError). CUDA 12: NUMA-host is | ||
| # rejected at the boundary first (TypeError). | ||
| with pytest.raises( | ||
| (ValueError, TypeError), | ||
| match="does not support location_type='host_numa'|require a CUDA 13 build", | ||
| ): | ||
| buffer.accessed_by.add(Host(numa_id=_INVALID_HOST_DEVICE_ORDINAL)) | ||
|
|
||
| buffer.close() | ||
|
|
||
|
|
||
| def test_managed_memory_advise_location_validation(location_ops_device): | ||
| """Verify doc-specified location constraints for each advice kind.""" | ||
| device = location_ops_device | ||
| plain = DummyUnifiedMemoryResource(device).allocate(_MANAGED_TEST_ALLOCATION_SIZE) | ||
| buffer = ManagedBuffer.from_handle(plain.handle, plain.size, owner=plain) | ||
|
|
||
| # read_mostly works without a location |
There was a problem hiding this comment.
These 8 standalone test functions are largely redundant with TestManagedBuffer methods — they test the same operations (prefetch, read_mostly, preferred_location, discard_prefetch) but with a different buffer source (pool-allocated vs external DummyUnifiedMemoryResource + from_handle).
The two axes being tested are operation × buffer source. This is a textbook @pytest.fixture(params=...) case. One parametrized fixture:
@pytest.fixture(params=["pool", "external"], ids=["pool", "external"])
def managed_buffer(request, location_ops_device):
if request.param == "pool":
mr = create_managed_memory_resource_or_skip()
buf = mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=location_ops_device.default_stream)
yield buf
buf.close()
mr.close()
else:
plain = DummyUnifiedMemoryResource(location_ops_device).allocate(_MANAGED_TEST_ALLOCATION_SIZE)
buf = ManagedBuffer.from_handle(plain.handle, plain.size, owner=plain)
yield buf
plain.close()Then each TestManagedBuffer test runs twice — pool and external — and these 8 standalone functions (~150 lines) can be deleted entirely. conftest already uses this factory pattern with memory_resource_factory (parametrized over DeviceMR/PinnedMR/ManagedMR).
| @pytest.fixture | ||
| def memory_pool_device(init_cuda): | ||
| device = Device() | ||
| skip_if_managed_memory_unsupported(device) | ||
| device.set_current() | ||
| return device | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def location_ops_device(init_cuda): | ||
| device = Device() | ||
| _skip_if_managed_location_ops_unsupported(device) | ||
| device.set_current() | ||
| return device | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def discard_prefetch_device(init_cuda): | ||
| device = Device() | ||
| _skip_if_managed_discard_prefetch_unsupported(device) | ||
| device.set_current() | ||
| return device | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def memory_pool_mr(memory_pool_device): | ||
| return create_managed_memory_resource_or_skip() | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def location_ops_mr(location_ops_device): | ||
| return create_managed_memory_resource_or_skip() | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def discard_prefetch_mr(discard_prefetch_device): | ||
| return create_managed_memory_resource_or_skip() | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def location_ops_buffer(location_ops_device, location_ops_mr): | ||
| buf = location_ops_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=location_ops_device.default_stream) | ||
| yield buf | ||
| buf.close() | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def discard_prefetch_buffer(discard_prefetch_device, discard_prefetch_mr): | ||
| buf = discard_prefetch_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=discard_prefetch_device.default_stream) | ||
| yield buf | ||
| buf.close() |
There was a problem hiding this comment.
9 fixtures is a lot for what amounts to 3 skip-tiers × {device, mr, buffer}. The three *_mr fixtures are byte-for-byte identical (create_managed_memory_resource_or_skip()), and the three *_device fixtures differ only in which skip function they call.
Suggestion: keep only location_ops_device + location_ops_mr (the most common tier) and a managed_buffer fixture parametrized over buffer source (see comment on standalone tests). The 2–3 tests needing memory_pool_* or discard_prefetch_* can call their specific skip function inline — they already know their requirement. This would reduce 9 fixtures (~50 lines) to 3–4 (~20 lines).
Also, conftest already has an init_cuda fixture that calls Device(0).set_current(). The *_device fixtures duplicate that work (they call Device(); device.set_current() inside). They could depend on init_cuda and just add the skip check.
| skip_if_managed_memory_unsupported, | ||
| ) | ||
| from cuda.core import Device, Host, ManagedBuffer | ||
| from cuda.core._memory._managed_buffer import _get_int_attr |
There was a problem hiding this comment.
nit: _get_int_attr(buf, driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION) is repeated 8 times verbatim in this file. A one-liner helper would improve readability:
def _last_prefetch_location(buf):
return _get_int_attr(buf, driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION)Note: this concern becomes moot once ManagedBuffer exposes mem-range attributes directly (per the pending comment on line 17 about leaking the cuda.bindings.driver abstraction). Once buf.last_prefetch_location or similar exists, tests would use that instead of reaching into driver internals.
|
|
||
| assert _coerce_location(None, allow_none=True) is None | ||
|
|
||
| def test_int_rejected(self): | ||
| from cuda.core._memory._managed_location import _coerce_location | ||
|
|
||
| # int shorthand was removed in favor of explicit Device/Host | ||
| with pytest.raises(TypeError, match="Device, Host, or None"): | ||
| _coerce_location(0) | ||
|
|
||
| def test_bad_type(self): | ||
| from cuda.core._memory._managed_location import _coerce_location | ||
|
|
||
| with pytest.raises(TypeError, match="Device, Host, or None"): | ||
| _coerce_location("device") | ||
|
|
||
|
|
||
| class TestPrefetchBatch: | ||
| """Tests for utils.prefetch_batch (batched-only free function).""" | ||
|
|
||
| def test_same_location(self, memory_pool_device, memory_pool_mr): | ||
| from cuda.core.utils import prefetch_batch | ||
|
|
||
| device = memory_pool_device | ||
| bufs = [memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) for _ in range(3)] | ||
| stream = device.create_stream() | ||
|
|
||
| prefetch_batch(stream, bufs, device) | ||
| stream.sync() | ||
|
|
||
| for buf in bufs: | ||
| last = _get_int_attr( | ||
| buf, | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION, | ||
| ) | ||
| assert last == device.device_id | ||
| buf.close() | ||
|
|
||
| def test_per_buffer_location(self, memory_pool_device, memory_pool_mr): | ||
| from cuda.core.utils import prefetch_batch | ||
|
|
||
| device = memory_pool_device | ||
| bufs = [memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) for _ in range(2)] | ||
| stream = device.create_stream() | ||
|
|
||
| prefetch_batch(stream, bufs, [Host(), device]) | ||
| stream.sync() | ||
|
|
||
| last0 = _get_int_attr( | ||
| bufs[0], | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION, | ||
| ) | ||
| last1 = _get_int_attr( | ||
| bufs[1], | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION, | ||
| ) | ||
| assert last0 == _HOST_LOCATION_ID | ||
| assert last1 == device.device_id | ||
| for buf in bufs: | ||
| buf.close() | ||
|
|
||
| def test_length_mismatch(self, memory_pool_device, memory_pool_mr): | ||
| from cuda.core.utils import prefetch_batch | ||
|
|
||
| device = memory_pool_device | ||
| bufs = [memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) for _ in range(2)] | ||
| stream = device.create_stream() | ||
|
|
||
| with pytest.raises(ValueError, match="length"): | ||
| prefetch_batch(stream, bufs, [Host()]) | ||
| for buf in bufs: | ||
| buf.close() | ||
|
|
||
| def test_rejects_single_buffer(self, memory_pool_device, memory_pool_mr): | ||
| from cuda.core.utils import prefetch_batch | ||
|
|
||
| device = memory_pool_device | ||
| buf = memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) | ||
| stream = device.create_stream() | ||
| with pytest.raises(TypeError, match="sequence of Buffers"): | ||
| prefetch_batch(stream, buf, Host()) | ||
| buf.close() | ||
|
|
||
|
|
||
| class TestDiscardBatch: | ||
| """Tests for utils.discard_batch (batched-only free function).""" | ||
|
|
||
| def test_basic(self, memory_pool_device, memory_pool_mr): | ||
| from cuda.core.utils import discard_batch, prefetch_batch | ||
|
|
||
| if not hasattr(driver, "cuMemDiscardBatchAsync"): | ||
| pytest.skip("cuMemDiscardBatchAsync unavailable") | ||
| device = memory_pool_device | ||
| bufs = [memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) for _ in range(3)] | ||
| stream = device.create_stream() | ||
| prefetch_batch(stream, bufs, device) | ||
| stream.sync() | ||
| discard_batch(stream, bufs) | ||
| stream.sync() | ||
| for buf in bufs: | ||
| buf.close() | ||
|
|
||
| def test_rejects_single_buffer(self, memory_pool_device, memory_pool_mr): | ||
| from cuda.core.utils import discard_batch | ||
|
|
||
| device = memory_pool_device | ||
| buf = memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) | ||
| stream = device.create_stream() | ||
| with pytest.raises(TypeError, match="sequence of Buffers"): | ||
| discard_batch(stream, buf) | ||
| buf.close() | ||
|
|
||
|
|
||
| class TestDiscardPrefetchBatch: | ||
| """Tests for utils.discard_prefetch_batch (batched-only free function).""" | ||
|
|
||
| def test_same_location(self, memory_pool_device, memory_pool_mr): | ||
| from cuda.core.utils import discard_prefetch_batch, prefetch_batch | ||
|
|
||
| if not hasattr(driver, "cuMemDiscardAndPrefetchBatchAsync"): | ||
| pytest.skip("cuMemDiscardAndPrefetchBatchAsync unavailable") | ||
| device = memory_pool_device | ||
| bufs = [memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) for _ in range(2)] | ||
| stream = device.create_stream() | ||
| prefetch_batch(stream, bufs, Host()) | ||
| stream.sync() | ||
| discard_prefetch_batch(stream, bufs, device) | ||
| stream.sync() | ||
| for buf in bufs: | ||
| last = _get_int_attr( | ||
| buf, | ||
| driver.CUmem_range_attribute.CU_MEM_RANGE_ATTRIBUTE_LAST_PREFETCH_LOCATION, | ||
| ) | ||
| assert last == device.device_id | ||
| buf.close() | ||
|
|
||
| def test_length_mismatch(self, memory_pool_device, memory_pool_mr): | ||
| from cuda.core.utils import discard_prefetch_batch | ||
|
|
||
| device = memory_pool_device | ||
| bufs = [memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) for _ in range(2)] | ||
| stream = device.create_stream() | ||
| with pytest.raises(ValueError, match="length"): | ||
| discard_prefetch_batch(stream, bufs, [Host()]) | ||
| for buf in bufs: | ||
| buf.close() | ||
|
|
||
| def test_rejects_single_buffer(self, memory_pool_device, memory_pool_mr): | ||
| from cuda.core.utils import discard_prefetch_batch | ||
|
|
||
| device = memory_pool_device | ||
| buf = memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=device.default_stream) | ||
| stream = device.create_stream() | ||
| with pytest.raises(TypeError, match="sequence of Buffers"): | ||
| discard_prefetch_batch(stream, buf, Host()) | ||
| buf.close() | ||
|
|
||
|
|
There was a problem hiding this comment.
test_rejects_single_buffer is copy-pasted across all three batch test classes (TestPrefetchBatch, TestDiscardBatch, TestDiscardPrefetchBatch) — identical except for the function name. test_length_mismatch is also duplicated across two. These can be parametrized into standalone tests:
@pytest.mark.parametrize("fn_name,needs_loc", [
("prefetch_batch", True),
("discard_batch", False),
("discard_prefetch_batch", True),
])
def test_batch_rejects_single_buffer(memory_pool_device, memory_pool_mr, fn_name, needs_loc):
from cuda.core import utils
fn = getattr(utils, fn_name)
buf = memory_pool_mr.allocate(_MANAGED_TEST_ALLOCATION_SIZE, stream=memory_pool_device.default_stream)
stream = memory_pool_device.create_stream()
args = (stream, buf, Host()) if needs_loc else (stream, buf)
with pytest.raises(TypeError, match="sequence of Buffers"):
fn(*args)
buf.close()With this + parametrization of test_length_mismatch, the three batch test classes shrink significantly — or could be merged into one TestBatchOps class.
| def _skip_if_managed_allocation_unsupported(device): | ||
| try: | ||
| if not device.properties.managed_memory: | ||
| pytest.skip("Device does not support managed memory operations") | ||
| except AttributeError: | ||
| pytest.skip("Managed-memory buffer operations require CUDA support") | ||
|
|
||
|
|
||
| def _skip_if_managed_location_ops_unsupported(device): | ||
| _skip_if_managed_allocation_unsupported(device) | ||
| try: | ||
| if not device.properties.concurrent_managed_access: | ||
| pytest.skip("Device does not support concurrent managed memory access") | ||
| except AttributeError: | ||
| pytest.skip("Managed-memory location operations require CUDA support") | ||
|
|
||
|
|
||
| def _skip_if_managed_discard_prefetch_unsupported(device): | ||
| _skip_if_managed_location_ops_unsupported(device) | ||
| if not hasattr(driver, "cuMemDiscardAndPrefetchBatchAsync"): | ||
| pytest.skip("discard-prefetch requires cuda.bindings support") | ||
|
|
||
| visible_devices = Device.get_all_devices() | ||
| if not all(dev.properties.concurrent_managed_access for dev in visible_devices): | ||
| pytest.skip("discard-prefetch requires concurrent managed access on all visible devices") |
There was a problem hiding this comment.
The local _skip_if_managed_allocation_unsupported vs conftest's skip_if_managed_memory_unsupported have confusingly similar names but check different things:
- conftest: "can I create a
ManagedMemoryResource?" (pool infra + runtime check) - local: "does the device support
cuMemAllocManaged?" (raw managed alloc capability)
The local version is needed because DummyUnifiedMemoryResource uses cuMemAllocManaged directly, not ManagedMemoryResource. But the naming makes it look like a copy-paste bug.
Suggestions:
- Rename to
_skip_if_cuMemAllocManaged_unsupportedto make the distinction obvious. - Better: inline this check into
DummyUnifiedMemoryResourceitself (it already knows it needs managed memory — it could skip/raise at allocation time), eliminating the need for callers to remember which skip function to use. _skip_if_managed_allocation_unsupportedis only ever called by_skip_if_managed_location_ops_unsupported— could be inlined there (it's 4 lines).
Summary
Adds managed-memory range operations to
cuda.core:cuda.core.utils:advise,prefetch,discard,discard_prefetch. Each accepts either a singleBufferor a sequence; N==1 dispatches to the per-range driver entry point and N>1 dispatches to the correspondingcuMem*BatchAsync(CUDA 13+).Host— new top-level singleton class symmetric toDevice.Host()(any host),Host(numa_id=N),Host.numa_current(). Same-argument constructions are interned (Host() is Host()). Used together withDeviceto express managed-memory locations.ManagedBuffer—Buffersubclass returned byManagedMemoryResource.allocate. Exposes a Pythonic property-style advice API on top of the same free functions. Wrap an external managed pointer withBuffer.from_handle(...)(now a@classmethod, soManagedBuffer.from_handle(...)returns aManagedBuffer).Closes #1332. Addresses the managed-memory portion of #1333 (P1:
cuMemPrefetchBatchAsync,cuMemDiscardBatchAsync,cuMemDiscardAndPrefetchBatchAsync). The P0cuMemcpyBatchAsyncfrom #1333 is intentionally out of scope and tracked separately; the holistic batched-API contract this PR commits to is documented in #issuecomment-4355502334 so the upcomingcuMemcpyBatchAsyncwork can mirror it.Public API
ManagedBuffer— property-style advice on managed allocationsManagedMemoryResource.allocatereturns aManagedBuffer(aBuffersubclass). All ManagedBuffer-specific behavior is layered on top of the free functions, so the two surfaces stay consistent.Free functions —
advise/prefetch/discard/discard_prefetchEach accepts a
Buffer(orManagedBuffer) or a sequence of them. Locations are expressed viaDeviceorHost.Batched form — same function, sequence of targets
When N>1, dispatch goes to the corresponding
cuMem*BatchAsync. Sequence locations are paired by index; a scalar location broadcasts to every target.Mismatched sequence lengths raise
ValueError. On a CUDA 12 build ofcuda.core, N>1 raisesNotImplementedError(the*BatchAsyncentry points are CUDA 13+); N==1 works on every supported toolkit.Putting it together
Implementation notes
cuda_core/cuda/core/_memory/_managed_memory_ops.pyxusescimport cydriverfor direct C-level driver calls.cuMemAdviseandcuMemPrefetchAsyncis handled at compile time withIF CUDA_CORE_BUILD_MAJOR >= 13:/ELSE:.cuMemPrefetchBatchAsync,cuMemDiscardBatchAsync,cuMemDiscardAndPrefetchBatchAsync) are CUDA 13+ only. On CUDA 12 builds, N>1 calls raiseNotImplementedError; single-buffer calls work everywhere.Hostis a singleton with__slots__and a__new__-based intern cache keyed by(numa_id, is_numa_current). Same-argument constructions return the same instance on both Python and Cython call paths.ManagedBufferis a pure-Python subclass of the CythonBuffercdef class.Buffer.from_handleis now a@classmethod(was@staticmethod) soMyBufferSubclass.from_handle(...)returns the typed instance viacls._init.Buffer_from_deviceptr_handleand_MP_allocatethread an optionalclsparameter soManagedMemoryResource.allocatematerializes aManagedBuffer._LocSpec(in_managed_location.py) carries the(kind, id)discriminator that the Cython layer maps toCUmemLocation(CUDA 13) or a legacy device ordinal (CUDA 12). Public callers see onlyDevice/Host;_coerce_locationproduces the internal record._buffer.pyxcollapsesout.is_managed = (is_managed != 0)to a single unconditional assignment and adds a TODO noting that HMM/ATS-mapped sysmem is not yet captured byCU_POINTER_ATTRIBUTE_IS_MANAGED.