Speed-up Parquet data generation#10
Speed-up Parquet data generation#10wolfgang-desalvador wants to merge 1 commit intomlcommons:mainfrom
Conversation
|
This method needs to be validated @russfellows @wvaske @FileSystemGuy since it would require from the processes the ability to keep in memory the whole 3+ GiB parquet file. |
|
This looks like a good change. I will try this out, and see if there are any further optimizations that can be made as well. Thanks Wolfgang |
… into row groups Based on mlcommons#10 (Wolfgang De Salvador). Generate all column data in one pass before the batch loop, then use pa.Table.slice() (zero-copy in Arrow) to produce each row-group batch. Reduces generation call overhead from (num_batches × num_columns) to just num_columns calls. For a file with 10 batches and 5 columns this is a 10× reduction in gen_random_tensor calls. Improvement over upstream PR: added explicit memory trade-off comment and clarified the zero-copy slice semantics.
|
@mlcommons/wg-storage-approvers Could you take a look at this PR? |
|
Wolfgang,
I will look this evening. I am attending an AI conference today, so fairly tied up. But, I like the idea, so thank you.
Regards,
—Russ
… On Apr 14, 2026, at 5:30 AM, Wolfgang De Salvador ***@***.***> wrote:
wolfgang-desalvador
left a comment
(mlcommons/DLIO_local_changes#10)
<#10 (comment)>
@mlcommons/wg-storage-approvers <https://github.com/orgs/mlcommons/teams/wg-storage-approvers> Could you take a look at this PR?
—
Reply to this email directly, view it on GitHub <#10 (comment)>, or unsubscribe <https://github.com/notifications/unsubscribe-auth/AF64UJ5CEA44GBAWWPPVHDT4VYONFAVCNFSM6AAAAACXQ7NHPSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHM2DENBTGUZDGOJRGY>.
You are receiving this because you were mentioned.
|
…d, CI thread cap (#12) * Fix s3pytorch force path style boolean option. * Refactor S3 pytorch implementation. Change code to use storage_root config option and namespace. Removes urlparsing for each I/O. Updates some default config options to be sane for both file and object. * feat: Add multi-library S3 storage support (minio, s3dlio, s3torch) - Add StorageLibrary enum to consistently select S3 libraries - Refactor storage_factory to route to selected library backends - Implement MinIO storage backend with MPI rank-based endpoint selection - Implement s3dlio storage backend with native multi-endpoint support - Enable comparison testing across S3 client libraries This enables DLIO benchmarks to test different S3 client implementations for performance comparison and multi-endpoint load balancing strategies. * refactor: convert direct imports to lazy imports in profiler_factory (#325) - Move profiler imports inside get_profiler() method - Benefits: - Avoids loading TFProfiler (which imports tensorflow) unless needed - Reduces import overhead for users not using TENSORBOARD profiler - Default profiler (IOSTAT) no longer triggers tensorflow import - No breaking changes - same API, same behavior * feat: add native AIStore storage backend (#321) Add a native AIStore storage handler that uses the official AIStore Python SDK for direct access, bypassing the S3 compatibility layer for better performance and simpler configuration. Changes: - Add AIStoreStorage class with full CRUD operations, range reads, and prefix-based object listing - Add StorageType.AISTORE enum and wire it through StorageFactory, GeneratorFactory, and ReaderFactory (reuses S3 generators/readers) - Add AIStore endpoint configuration support in ConfigArguments - Add 'aistore' optional dependency in setup.py - Add mock-based test suite with full AIStore SDK simulation - Add CI workflow for AIStore tests - Add storage configuration section to documentation Supported formats: NPY, NPZ, JPEG Supported frameworks: PyTorch, TensorFlow Signed-off-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com> * fix(counters): train phase was not evaluated (#328) * fix(counters): train phase was not evaluated PR #302 moved loop breaking condition from the end of the loop at its start. Which never fires self.stats.end_block of the current block as the iteration never start. Trying regulat pytorch loader from local fs: ``` [OUTPUT] 2026-02-27T06:58:50.214359 Running DLIO [Training & Evaluation] with 2 process(es) [WARNING] The amount of dataset is smaller than the host memory; data might be cached after the first epoch. Increase the size of dataset to eliminate the caching effect!!! [OUTPUT] 2026-02-27T06:58:50.229669 Max steps per epoch: 128 = 1 * 1024 / 4 / 2 (samples per file * num files / batch size / comm size) [OUTPUT] 2026-02-27T06:58:50.229764 Steps per eval: 32 = 1 * 64 / 1 / 2 (samples per file * num files / batch size eval / comm size) [OUTPUT] 2026-02-27T06:58:50.278417 Starting epoch 1: 128 steps expected [OUTPUT] 2026-02-27T06:58:50.278614 Starting block 1 [OUTPUT] 2026-02-27T06:59:03.743752 Ending epoch 1 - 128 steps completed in 13.47 s [OUTPUT] 2026-02-27T06:59:03.747196 Starting eval - 32 steps expected [OUTPUT] 2026-02-27T06:59:07.122980 Ending eval - 32 steps completed in 3.38 s [OUTPUT] 2026-02-27T06:59:07.124598 Epoch 1 [Eval] Accelerator Utilization [AU] (%): 99.4141 [OUTPUT] 2026-02-27T06:59:07.124644 Epoch 1 [Eval] Throughput (samples/second): 18.9592 [OUTPUT] 2026-02-27T06:59:07.130596 Starting epoch 2: 128 steps expected [OUTPUT] 2026-02-27T06:59:07.130832 Starting block 1 [OUTPUT] 2026-02-27T06:59:20.047588 Ending epoch 2 - 128 steps completed in 12.92 s [OUTPUT] 2026-02-27T06:59:20.048553 Starting eval - 32 steps expected [OUTPUT] 2026-02-27T06:59:23.276666 Ending eval - 32 steps completed in 3.23 s [OUTPUT] 2026-02-27T06:59:23.277556 Epoch 2 [Eval] Accelerator Utilization [AU] (%): 99.4022 [OUTPUT] 2026-02-27T06:59:23.277595 Epoch 2 [Eval] Throughput (samples/second): 19.8261 [OUTPUT] 2026-02-27T06:59:23.280422 Starting epoch 3: 128 steps expected [OUTPUT] 2026-02-27T06:59:23.280591 Starting block 1 [OUTPUT] 2026-02-27T06:59:36.196122 Ending epoch 3 - 128 steps completed in 12.92 s [OUTPUT] 2026-02-27T06:59:36.197005 Starting eval - 32 steps expected [OUTPUT] 2026-02-27T06:59:39.425806 Ending eval - 32 steps completed in 3.23 s [OUTPUT] 2026-02-27T06:59:39.426645 Epoch 3 [Eval] Accelerator Utilization [AU] (%): 99.4032 [OUTPUT] 2026-02-27T06:59:39.426682 Epoch 3 [Eval] Throughput (samples/second): 19.8219 [OUTPUT] 2026-02-27T06:59:39.469524 Saved outputs in /lus/flare/projects/DAOS_Testing/PAP166/hydra_log/default/2026-02-27-06-58-50 [OUTPUT] Averaged metric over all steps/epochs [METRIC] ========================================================== [METRIC] Number of Simulated Accelerators: 2 [METRIC] Training Accelerator Utilization [AU] (%): 0.0000 (0.0000) [METRIC] Training Throughput (samples/second): 0.0000 (0.0000) [METRIC] Training I/O Throughput (MB/second): 0.0000 (0.0000) [METRIC] train_au_meet_expectation: fail [METRIC] Eval Accelerator Utilization [AU] (%): 49.7048 (0.0028) [METRIC] Eval Throughput (samples/second): 9.765259 (0.206374) [METRIC] Eval Throughput (MB/second): 0.038146 (0.000806) [METRIC] eval_au_meet_expectation: fail [METRIC] ========================================================== [OUTPUT] 2026-02-27T06:59:39.484237 outputs saved in RANKID_output.json ``` Notice that logs are only show starting of the block and never its ending. After the fix: ``` [OUTPUT] 2026-02-28T12:30:28.000590 Running DLIO [Training & Evaluation] with 2 process(es) [WARNING] The amount of dataset is smaller than the host memory; data might be cached after the first epoch. Increase the size of dataset to eliminate the caching effect!!! [WARNING] Number of files for training in /dataset/train (4000) is more than requested (64). A subset of files will be used [WARNING] Number of files for training in /dataset/train (4000) is more than requested (64). A subset of files will be used [OUTPUT] 2026-02-28T12:30:28.102857 Max steps per epoch: 8 = 1 * 64 / 4 / 2 (samples per file * num files / batch size / comm size) [OUTPUT] 2026-02-28T12:30:28.102992 Steps per eval: 4 = 1 * 8 / 1 / 2 (samples per file * num files / batch size eval / comm size) [OUTPUT] 2026-02-28T12:30:30.572480 Starting epoch 1: 8 steps expected [OUTPUT] 2026-02-28T12:30:30.573084 Starting block 1 [OUTPUT] 2026-02-28T12:30:30.734535 Ending block 1 - 8 steps completed in 0.16 s [OUTPUT] 2026-02-28T12:30:30.740906 Epoch 1 - Block 1 [Training] Accelerator Utilization [AU] (%): 0.1428 [OUTPUT] 2026-02-28T12:30:30.740994 Epoch 1 - Block 1 [Training] Throughput (samples/second): 1753.1357 [OUTPUT] 2026-02-28T12:30:30.741060 Epoch 1 - Block 1 [Training] Computation time per step (second): 0.0000+/-0.0000 (set value: {}) [OUTPUT] 2026-02-28T12:30:30.741497 Ending epoch 1 - 8 steps completed in 0.17 s [OUTPUT] 2026-02-28T12:30:30.742789 Starting eval - 4 steps expected [OUTPUT] 2026-02-28T12:30:30.889307 Ending eval - 4 steps completed in 0.15 s [OUTPUT] 2026-02-28T12:30:30.891985 Epoch 1 [Eval] Accelerator Utilization [AU] (%): 0.0720 [OUTPUT] 2026-02-28T12:30:30.892054 Epoch 1 [Eval] Throughput (samples/second): 54.6620 [OUTPUT] 2026-02-28T12:30:30.900919 Starting epoch 2: 8 steps expected [OUTPUT] 2026-02-28T12:30:30.901249 Starting block 1 [OUTPUT] 2026-02-28T12:30:30.914273 Ending block 1 - 8 steps completed in 0.01 s [OUTPUT] 2026-02-28T12:30:30.915472 Epoch 2 - Block 1 [Training] Accelerator Utilization [AU] (%): 1.9055 [OUTPUT] 2026-02-28T12:30:30.915541 Epoch 2 - Block 1 [Training] Throughput (samples/second): 7765.7316 [OUTPUT] 2026-02-28T12:30:30.915595 Epoch 2 - Block 1 [Training] Computation time per step (second): 0.0000+/-0.0000 (set value: {}) [OUTPUT] 2026-02-28T12:30:30.915931 Ending epoch 2 - 8 steps completed in 0.02 s [OUTPUT] 2026-02-28T12:30:30.917061 Starting eval - 4 steps expected [OUTPUT] 2026-02-28T12:30:30.958733 Ending eval - 4 steps completed in 0.04 s [OUTPUT] 2026-02-28T12:30:30.959729 Epoch 2 [Eval] Accelerator Utilization [AU] (%): 0.0381 [OUTPUT] 2026-02-28T12:30:30.959768 Epoch 2 [Eval] Throughput (samples/second): 192.2493 [OUTPUT] 2026-02-28T12:30:30.960091 Starting epoch 3: 8 steps expected [OUTPUT] 2026-02-28T12:30:30.960275 Starting block 1 [OUTPUT] 2026-02-28T12:30:30.976061 Ending block 1 - 8 steps completed in 0.02 s [OUTPUT] 2026-02-28T12:30:30.977423 Epoch 3 - Block 1 [Training] Accelerator Utilization [AU] (%): 0.6369 [OUTPUT] 2026-02-28T12:30:30.977483 Epoch 3 - Block 1 [Training] Throughput (samples/second): 6020.3520 [OUTPUT] 2026-02-28T12:30:30.977534 Epoch 3 - Block 1 [Training] Computation time per step (second): 0.0000+/-0.0000 (set value: {}) [OUTPUT] 2026-02-28T12:30:30.977792 Ending epoch 3 - 8 steps completed in 0.02 s [OUTPUT] 2026-02-28T12:30:30.978884 Starting eval - 4 steps expected [OUTPUT] 2026-02-28T12:30:30.983803 Ending eval - 4 steps completed in 0.00 s [OUTPUT] 2026-02-28T12:30:30.984927 Epoch 3 [Eval] Accelerator Utilization [AU] (%): 1.3682 [OUTPUT] 2026-02-28T12:30:30.984986 Epoch 3 [Eval] Throughput (samples/second): 1641.1245 [OUTPUT] 2026-02-28T12:30:30.986010 Saved outputs in /home/denis/dev/enakta/dlio_benchmark/hydra_log/default/2026-02-28-12-30-25 [OUTPUT] Averaged metric over all steps/epochs [METRIC] ========================================================== [METRIC] Number of Simulated Accelerators: 2 [METRIC] Training Accelerator Utilization [AU] (%): 0.5939 (0.4129) [METRIC] Training Throughput (samples/second): 4948.3957 (2466.6534) [METRIC] Training I/O Throughput (MB/second): 19.3297 (9.6354) [METRIC] train_au_meet_expectation: fail [METRIC] Eval Accelerator Utilization [AU] (%): 0.4704 (0.5038) [METRIC] Eval Throughput (samples/second): 444.414075 (396.070635) [METRIC] Eval Throughput (MB/second): 1.735992 (1.547151) [METRIC] eval_au_meet_expectation: fail [METRIC] ========================================================== [OUTPUT] 2026-02-28T12:30:30.987839 outputs saved in RANKID_output.json ``` Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com> * fix: remove unreachable branch Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com> --------- Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com> Co-authored-by: Denis Barakhtanov <denis.barahtanov@gmail.com> * refactor(generators): unify generators to work with any storage backend (#329) Every new storage backend required copy-pasting each generator into an _XXX sibling file: npz_generator_s3.py, npy_generator_s3.py and so on. The only difference was whether to write the output locally on disk, directly via numpy/PIL, or via the storage interface. This makes the pattern unsustainable: two duplicated formats today, more with each new backend — incurring a significant maintenance burden. Since all generators already had a storage instance and used it to generate file names, we can leverage it. The only set of generators now can check if the stroage is locally available via `islocalfs` and use some optimisation, if any. If the storage is not local, the sample serializes to io.BytesIO, call buf.getvalue(), and delegate to self.storage.put_data(). All storage backends receive plain bytes as designed by the storage interface, removing type inspection, seek() and getvalue() calls scattered across backends. - FileStorage.put_data was never called, had text-mode open and a double get_uri call (once from the generator, once inside put_data itself). Now it is the default write path for LOCAL_FS, used by almost every workload config. get_data aligned to binary mode ("rb") for consistency. - AIStoreStorage.put_data: remove isinstance dispatch, accept bytes directly. - S3TorchStorage.put_data: remove data.getvalue() — just write data. - generator_factory: removed S3/AIStore branching for NPZ, NPY, JPEG. - factory referenced jpeg_generator_s3.JPEGGeneratorS3 which never existed; JPEG + S3/AIStore would crash at import time. After this patch, adding a new storage backend requires no changes in any generator. Adding a new data format automatically works with all backends. Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com> Co-authored-by: Denis Barakhtanov <denis.barahtanov@gmail.com> * feat: object storage integration work-in-progress (multi-library S3, dpsi backends, checkpointing) - Rework s3_torch_storage.py with multi-library S3 support - Enhance all 4 checkpointing modules (pytorch, pytorch_s3, tf, base) - Remove minio_storage.py and s3dlio_storage.py (consolidated) - Add s3_storage_dpsi.py and s3_torch_storage_dpsi.py (new dpsi backends) - Update storage_factory.py, config.py, utility.py, enumerations.py - Update unet3d S3 workload configs - Update jpeg/png data generators and main.py WIP snapshot: 2026-03-18 * refactor: consolidate S3 storage, fix test output dir, centralise env-var config - Rename s3_torch_storage.py → obj_store_lib.py (multi-library backend) - Delete s3_torch_storage_dpsi.py (dpsi architecture removed) - storage_factory.py: route S3+PyTorch to ObjStoreLibStorage only (no dpsi branch) - config.py: add _load_dotenv() and _apply_env_overrides() per Hari's recommendation Single location for all os.getenv calls; precedence: YAML > env > .env > defaults Introduces DLIO_OUTPUT_FOLDER env var to redirect output directory - conftest.py: set DLIO_OUTPUT_FOLDER=dlio_test_output for all test runs - dlio_benchmark_test.py: inject output.folder via OmegaConf; clean() uses named dir - dlio_s3_benchmark_test.py: same output dir fix + run_benchmark() OmegaConf injection - dlio_aistore_benchmark_test.py: same fix + add missing OmegaConf import * feat: add parallel S3 iterable readers and parquet byte-range support (v3.0.0-beta) Version bump: 2.0.0 → 3.0.0-beta - setup.py: version 3.0.0, Development Status 4-Beta, add 'parquet' extra requiring pyarrow>=12.0.0 New readers (dlio_benchmark/reader/): - npz_reader_s3_iterable.py: NPZReaderS3Iterable — parallel prefetch of all NPZ files assigned to a DLIO worker thread via s3dlio.get_many() (up to 64 concurrent range GETs) or minio ThreadPoolExecutor; eliminates the serial one-round-trip-per-file penalty of the existing NPZReaderS3 - npy_reader_s3_iterable.py: NPYReaderS3Iterable — mirrors NPZ version for raw numpy files (no key extraction) - parquet_reader_s3_iterable.py: ParquetReaderS3Iterable — row-group-granular parquet reader using HTTP byte-range GETs; opens files by reading only the footer, then fetches individual row groups on demand via s3dlio.get_range() or minio.get_object(offset=, length=); LRU-bounded row-group cache; supports optional column projection via storage_options.columns Adapter classes: _S3RangeFile (s3dlio/s3torchconnector) and _MinioRangeFile provide the seekable file-like interface required by pyarrow.parquet Storage and config fixes: - obj_store_lib.py: remove env-var fallbacks (STORAGE_LIBRARY, DLIO_URI_SCHEME, DLIO_OBJECT_KEY_USE_FULL_URI, DLIO_ENDPOINT_URL) — config.py is now the single source of truth; values must flow through storage_options - obj_store_lib.py: fix list_objects() to use s3dlio.list(uri, recursive=True) with correct prefix stripping (removes double-slash and bucket-prefix issues) - config.py: promote storage.storage_library from top-level storage section into storage_options dict so backends can access it consistently Enumerations: - enumerations.py: add FormatType.PARQUET = 'parquet' and get_enum() branch reader_factory.py: - Route FormatType.NPZ + FormatType.NPY to iterable readers when storage_library is s3dlio, s3torchconnector, or minio - Route FormatType.PARQUET to ParquetReaderS3Iterable All three reader variants support s3dlio, s3torchconnector, and minio as interchangeable storage backends via storage_options.storage_library. * feat: add DIRECT_FS storage type and route it in storage_factory Add StorageType.DIRECT_FS = 'direct_fs' to enumerations so that the O_DIRECT backend (via s3dlio direct:// URI) can be selected at runtime. Update storage_factory.py to treat DIRECT_FS identically to LOCAL_FS for DLIO's internal file-listing path; actual I/O is handled by the StreamingCheckpointing layer which routes direct_fs to s3dlio. * feat: add PT_OBJ_SAVE checkpoint type for minio/s3dlio object store backends Introduce a new checkpoint type PT_OBJ_SAVE that routes checkpointing through pytorch_obj_store_checkpointing.py, enabling minio and s3dlio as checkpoint storage backends without requiring s3torchconnector. Key changes: - pytorch_obj_store_checkpointing.py: New checkpoint engine using ObjStoreLibStorage for save/load via minio or s3dlio libraries - pytorch_checkpointing.py: Use _streaming_cache dict keyed by backend type; select direct_fs (O_DIRECT) vs file (fadvise) based on storage_type arg - pytorch_s3_checkpointing.py: S3 backend refinements - checkpointing_factory.py: Route PT_OBJ_SAVE to new class - config.py: Fix storage_library-aware validation; convert OmegaConf DictConfig to plain dict before adding dynamic storage_library key * feat: add multi-library S3 iterable readers with strict isolation and image support SUMMARY ======= This commit consolidates a major enhancement to DLIO's S3 object storage support. All changes support three storage libraries (s3dlio, s3torchconnector, minio) with strict per-library isolation — no silent fallback, no cross-library usage. Failing to install the configured library raises ImportError at construction time with a clear pip install hint. CHANGED FILES ============= dlio_benchmark/reader/npy_reader_s3_iterable.py - REPLACED: Rewrote from scratch. Previously, s3torchconnector branch silently called s3dlio.get_many() — wrong library, wrong behavior, wrong docstring. - NEW: Dedicated _prefetch_s3torchconnector() method using S3IterableDataset .from_objects() with S3ReaderConstructor.sequential() — no s3dlio dependency. - NEW: Early ImportError in __init__ if s3torchconnector not installed. - NEW: Strict per-library dispatch in _prefetch(): s3dlio / s3torchconnector / minio each handled explicitly; raises ValueError for unknown library. - NEW: Full module docstring listing all 3 libraries and strict-isolation warning. - FIXED: s3torchconnector env var not set for s3torchconnector (only s3dlio). dlio_benchmark/reader/npz_reader_s3_iterable.py - FIXED: stale docstrings removed ('listing uses s3dlio' was false for s3torchconnector and minio paths). - IMPROVED: _prefetch_s3dlio uses _BytesViewIO + io.BufferedReader to trigger np.load's readinto() path (in-place copy into numpy buffer) rather than bytes() (separate Python allocation). Peak memory: Rust BytesView only. - IMPROVED: _get_minio_client() cached across epochs for TCP keep-alive; urllib3 PoolManager with retry, timeout, maxsize=16 configuration. - IMPROVED: _prefetch_s3torchconnector() uses S3IterableDataset.from_objects() with sequential() reader (matches npy pattern). - IMPROVED: Module docstring accurately describes all 3 libraries. dlio_benchmark/reader/parquet_reader_s3_iterable.py - FIXED CRITICAL: s3torchconnector previously used s3dlio.get_range() and s3dlio.stat() internally — completely wrong, the docstring lied. - NEW: s3torchconnector uses S3Client.get_object() with S3ReaderConstructor.range_based() returning RangedS3Reader (BufferedIOBase with full seek/tell/read/readinto + SEEK_END). Requires s3torchconnector>=1.3.0. - NEW: Early ImportError + RuntimeError (version check for range_based attr) in __init__ for s3torchconnector — fail at construction, not during I/O. - NEW: self._s3torch_client = S3Client cached at construction time. - NEW: _make_range_file() dispatches to native RangedS3Reader for s3torchconnector; _S3RangeFile for s3dlio; _MinioRangeFile for minio. - FIXED: urllib.parse.urlparse import moved to top-level imports (was duplicated inside branches). - FIXED: Module docstring corrected — removed false 'uses s3dlio as engine'. dlio_benchmark/reader/reader_factory.py - NEW: JPEG/PNG storage_type routing — was completely missing, silently sending S3 workloads to ImageReader (local FS reader that calls PIL.Image.open(path)), which would fail hard with a misleading file-not-found error. - NEW: Routes JPEG and PNG on S3/AIStore with recognized storage_library to ImageReaderS3Iterable; falls back to ImageReader for local FS. - UNCHANGED: NPY/NPZ S3 routing (existing, correct). - UNCHANGED: Parquet always routes to ParquetReaderS3Iterable (by design). dlio_benchmark/reader/image_reader_s3_iterable.py [NEW FILE] - NEW: Parallel-prefetch JPEG/PNG reader for S3-compatible object stores. - Inherits from ImageReader (which inherits from FormatReader) — reuses get_sample, next, read_index from parent chain. - Supports all 3 storage libraries with identical pattern to NPYReaderS3Iterable: _prefetch_s3dlio, _prefetch_s3torchconnector, _prefetch_minio. - __init__: early fail for s3torchconnector (ImportError with pip hint). - open(): returns pre-fetched decoded numpy array from cache. - Uses PIL.Image.open() + np.asarray() for decode (to be removed in follow-up refactoring; only image.nbytes is used for telemetry, not the decoded data). dlio_benchmark/storage/obj_store_lib.py - IMPROVED: ObjStoreLibStorage enhanced for s3torchconnector and minio. - NEW: MinIOAdapter to make Minio client compatible with S3Client-like API. - IMPROVED: list_objects(), get_data(), put_data() all dispatch per library. dlio_benchmark/storage/storage_factory.py - ADDED: ObjStoreLibStorage path for S3 + PyTorch framework combination. - ADDED: AIStore support via AIStoreStorage (guarded import, fails with clear error if aistore package not installed). - DEBUG: Temporary debug prints left in for storage routing visibility. dlio_benchmark/utils/config.py - IMPROVED: storage_options propagated through ConfigArguments. - IMPROVED: storage_library field parsing from YAML. dlio_benchmark/utils/utility.py - IMPROVED: gen_random_tensor() uses dgen-py when available for 30-50x speedup. dlio_benchmark/data_generator/npz_generator.py - FIXED: minor generator compatibility improvement. dlio_benchmark/reader/npy_reader_s3.py dlio_benchmark/reader/npz_reader_s3.py - FIXED: minor compatibility fixes (vestigial sequential readers). README.md - MAJOR: Added comprehensive S3/object storage support documentation: overview, per-library install instructions, configuration YAML examples, run commands for all 3 libraries, timing correctness note. docs/DLIO-Object-Storage_Analysis.md [NEW FILE] - NEW: Analysis of DLIO timing loop behavior with object storage. Documents that measurement semantics are unchanged; S3 I/O occurs inside DataLoader worker prefetch, which is correctly inside the timed region. KNOWN ISSUES / FOLLOW-UP (tracked for next commit) =================================================== - Code replication: NPZ/NPY/Image S3 iterable readers share ~150 lines of identical prefetch logic (uri_for_obj_key, _prefetch_s3dlio/s3torch/minio, _prefetch dispatch, next, read_index). Refactoring to _S3IterableMixin planned as immediate follow-up. - numpy/PIL decode overhead: all three readers decode raw bytes to numpy arrays (np.load, PIL.Image.open + np.asarray) solely to get image.nbytes for telemetry. The actual decoded data is NEVER used — FormatReader.next() always yields self._args.resized_image (pre-allocated random tensor). Replacing decode with len(raw_bytes) eliminates unnecessary CPU work. - Parquet factory: no storage_type guard; configuring parquet + local FS silently constructs ParquetReaderS3Iterable which fails confusingly. - storage_factory.py: debug print() statements to be removed. - Old sequential readers (npy_reader_s3, npz_reader_s3): vestigial, factory no longer routes to them for recognized storage_library values. * refactor: collapse S3 iterable readers into thin subclasses via _S3IterableMixin MOTIVATION ---------- Three reader files (NPZ/NPY/Image) each contained ~250-307 lines of duplicated prefetch logic: per-library dispatch (_prefetch_s3dlio / _prefetch_s3torchconnector / _prefetch_minio), Minio client construction, endpoint env setup, and s3torchconnector import validation. A design review also revealed that all numpy/PIL decode (np.load, PIL.Image.open, np.asarray) inside those prefetch methods was pure CPU overhead whose result was NEVER used — FormatReader.next() always yields self._args.resized_image, a pre-allocated random tensor from config.py, not the actual decoded file data. CHANGES ------- _s3_iterable_mixin.py (new, 328 lines) Shared mixin for all three S3 iterable readers. Contains: - _s3_init(opts): library validation at construction, sets _storage_library / _opts / _object_cache / _minio_client; validates s3torchconnector is importable immediately (not lazily), so misconfiguration fails fast. - _uri_for_obj_key(): s3://bucket/key URI construction. - _get_minio_client(): lazy, cached urllib3.PoolManager + Minio SDK client; reused across epochs to avoid rebuilding TCP connection pools per epoch. - _prefetch_s3dlio(obj_keys) -> {key: len(data)}: parallel get via s3dlio.get_many(); stores ONLY the raw byte count, no numpy decode. - _prefetch_s3torchconnector(obj_keys) -> {key: len}: sequential streaming GET per object via S3IterableDataset.from_objects(); drains the reader with read() for byte count, no numpy decode. - _prefetch_minio(obj_keys) -> {key: len}: ThreadPoolExecutor + Minio.get_object; stores ONLY the raw byte count, no numpy decode. - _prefetch(obj_keys): dispatches to the above three (strict, no fallback). - _s3_prefetch_all(): collects deduplicated obj_keys for the current thread's file_map slice, calls _prefetch(), populates _object_cache. - _s3_ensure_cached(filename): on-demand fetch if filename not in cache. npz_reader_s3_iterable.py (307 lines → 74 lines) Thin subclass of NPZReader + _S3IterableMixin. Overrides: - open(filename): returns self._object_cache.get(filename) (int or None). - close(filename): evicts from cache. - get_sample(filename, sample_index): calls dlp.update(image_size=...) with cached byte count. Does NOT call super() — NPZReader.get_sample() would do open_file_map[filename][..., idx] which fails on an int. - next(): calls _s3_prefetch_all() then delegates to super().next(). - read_index(): calls _s3_ensure_cached() then delegates to super(). npy_reader_s3_iterable.py (254 lines → 107 lines) Thin subclass of NPYReader + _S3IterableMixin. Same override pattern as NPZ. get_sample() does NOT call super() for the same reason (NPYReader.get_sample() also indexes open_file_map[filename][..., sample_index]). image_reader_s3_iterable.py (259 lines → 110 lines) Thin subclass of ImageReader + _S3IterableMixin. Same override pattern. get_sample() additionally calls dft_ai.update(image_size=byte_count) to replicate the second metric update that ImageReader.get_sample() would have performed. Does NOT call super().get_sample() (ImageReader calls .nbytes on the cached value which is an int, not an ndarray). PERFORMANCE IMPACT ------------------ Before: every prefetched object was decoded (np.load / PIL.Image.open / np.asarray), consuming significant CPU time and memory for data that was immediately discarded. After: only len(raw_bytes) is stored. No numpy or PIL imports in any thin subclass. Minio client pooling across epochs reduces TCP setup overhead for all three formats. LINE COUNT SUMMARY ------------------ Before: ~820 lines across 3 files (+ no mixin) After: ~291 lines across 3 files + 328-line mixin = 619 total Net: -201 lines of code, -0 lines of unique logic (all logic is in mixin) * fix: make all 3 storage libraries consistent in _s3_iterable_mixin and thin subclasses - _s3_init(): minio now validates its import EAGERLY at construction time, matching s3dlio (env setup) and s3torchconnector (import check). All three libraries now fail-fast at __init__ instead of deferring minio's failure to the first I/O call. - image_reader_s3_iterable.py: fix stale module docstring that still said 'decodes them with Pillow into a numpy uint8 array' — PIL decode was eliminated in the mixin refactor. - image_reader_s3_iterable.py: remove unused 'import io' and 'import os'. - npy_reader_s3_iterable.py: remove unused 'import os'. * feat: add ParquetReader for local/network filesystems; fix factory routing ParquetReader (parquet_reader.py, new) Filesystem counterpart to ParquetReaderS3Iterable. Uses pyarrow natively (no object-storage adapters) with identical logic: - open(): reads parquet footer, builds cumulative row-group offset list - get_sample(): bisect maps sample_index → row_group, LRU cache bounds memory, reports compressed_bytes to dlp profiler - close(): evicts row-group cache entries for the file - Same options as S3 variant: columns, row_group_cache_size under storage_options - finalize(): clears entire row-group cache reader_factory.py (fix) FormatType.PARQUET was unconditionally routing ALL storage types to ParquetReaderS3Iterable — local filesystem parquet workloads would crash because _S3RangeFile tries to call s3dlio.stat() on a local path. Fixed to match the NPY/NPZ/JPEG pattern: S3 / AIStore → ParquetReaderS3Iterable (existing) local / lustre / etc. → ParquetReader (new) * refactor: AIStore cleanup — remove stale restrictions, debug prints, and unify bucket property - config.py: Remove stale AIStore+PyTorch format restriction (NPZ/NPY only); AIStore reader_factory already routes JPEG/PNG/Parquet to the S3-iterable readers, so the check was blocking valid workloads. - config.py: Remove legacy NPYReaderS3/NPZReaderS3 import validation block; those are the old non-mixin readers, not the path AIStore actually takes. - config.py: Remove [DEBUG LoadConfig] print block (ENTRY + EXIT summaries). - storage_factory.py: Remove all [DEBUG StorageFactory] print statements. - aistore_storage.py: Replace per-method 'if not self.bucket:' guards with a lazy @property that initialises self._bucket on first access; all methods now simply reference self.bucket and get the cached handle automatically. - aistore_storage.py: Fix isfile() which was missing the bucket guard entirely, causing AttributeError if called before any other storage operation. * chore: remove orphaned s3_storage_dpsi.py The file defined its own 'class S3Storage(DataStorage)' — identical name to s3_storage.py — creating a latent class-name collision if ever imported. It was added in commit 14561b8 as a work-in-progress prototype and was immediately superseded by ObjStoreLibStorage in obj_store_lib.py. Zero callers exist anywhere in the codebase (confirmed via grep). * chore: comment out DEBUG print statements in ObjStoreLibStorage All [DEBUG ...] prints in __init__, put_data, and get_data are now commented out rather than deleted, so they can be re-enabled easily during local debugging. The one active print (error in list_objects) is a real error message and is left in place. * storage: convert commented debug prints to logging.debug() in obj_store_lib Replaces the 20 commented-out # print(f'[DEBUG ...]') lines with proper logging.debug() calls, keyed off the existing DLIO_LOG_LEVEL infrastructure. Benefits over # print: - Zero-cost when DLIO_LOG_LEVEL != debug (short-circuit before formatting) - Appears automatically in the log file with timestamps - Includes file path + line number in debug mode - Works for users without source access (no code changes needed to enable) The credentials block uses an isEnabledFor(DEBUG) guard so the src_key/ src_sec/src_ep intermediate vars are only computed when debug is active. Enable with: DLIO_LOG_LEVEL=debug dlio_run ... * feat: multi-library object-store checkpointing (s3dlio / minio / s3torchconnector) Add full multi-library checkpoint support with the following changes: pytorch_obj_store_checkpointing.py: - Unified checkpoint writer/reader for s3dlio, minio, and s3torchconnector via storage_library key in the workload YAML - s3dlio multipart tuning: env-var overrides S3DLIO_MULTIPART_PART_SIZE_MB / S3DLIO_MULTIPART_MAX_IN_FLIGHT; production defaults restored to 128 MB x 8 - Documents v0.9.82 regression (blocking semaphore) and GitHub issue #134 in a large comment block above the s3dlio kwargs section pytorch_s3_checkpointing.py: - Deleted: functionality superseded by pytorch_obj_store_checkpointing.py config.py / enumerations.py: - Recognise storage_library: s3dlio | minio | s3torchconnector from workload YAML - Inject value into storage_options so PyTorchObjStoreCheckpointing can read it - Set correct checkpoint_mechanism and reader_classname per library - Fail fast with clear error if the selected library package is not installed obj_store_lib.py: - Instantiate the correct S3 client based on storage_library selection - s3dlio: PyObjectStoreClient; minio: Minio SDK; s3torch: S3Client _s3_iterable_mixin.py / parquet_reader_s3_iterable.py: - S3 reader cleanups for multi-library correctness tests/dlio_s3_benchmark_test.py: - Update tests to cover multi-library checkpoint paths docs/AIStore_Analysis.md: - New analysis document * feat: full object storage support for all formats — generators, readers, and framework layer All 8 DLIO benchmark formats (npy, npz, hdf5, parquet, csv, jpeg, png, tfrecord) now work correctly end-to-end against object storage (S3/MinIO/GCS/Azure) via the s3dlio storage library. This required fixes spanning data generators, readers, the TensorFlow framework layer, storage factory, config handling, and a 10-bug root-cause analysis. ## Data generators (data_generator/) ### Base class (data_generator.py) - Added _generate_files(write_fn) template method — eliminates ~15-line loop boilerplate duplicated across all 10 generators - Added _file_seed(i) helper: per-file deterministic seed = BASE_SEED + file_index - Added _extract_dims(i) helper for consistent dimension extraction - Migrated all 10 generators to use _generate_files() template ### Bug: np.random.seed(10) — all MPI ranks produce identical data All generators called np.random.seed(10) unconditionally before their write loop. With MPI, every rank wrote the same data to different files, making distributed generation produce duplicate datasets. Fixed with rank-unique per-file seeding via _file_seed(i). ### Bug: NPZ generator passed BytesIO object instead of bytes npz_generator.py called storage.put_data(path, output) where output was a BytesIO object. Fixed to output.getvalue() to pass actual bytes. ### Added object storage support to 6 generators that had none: - hdf5_generator.py: uses h5py core driver with BytesIO backing - csv_generator.py: uses io.StringIO → encode → put_data - tf_generator.py: uses BytesIO + TFRecord framing - indexed_binary_generator.py: uses BytesIO; also replaced legacy np.random global API with gen_random_tensor() (dgen-py, ~155x faster) - synthetic_generator.py: uses BytesIO - parquet_generator.py: uses pyarrow.BufferOutputStream; also replaced legacy np.random global API with gen_random_tensor() ## Read path (reader/) ### Bug: reader_factory.py routed CSV/HDF5/TFRECORD to wrong readers for s3dlio When storage_library=s3dlio, CSV/HDF5/TFRECORD formats were routed to local-file readers that called open() on S3 URIs. Fixed by adding s3dlio dispatch branches that select the new iterable readers. ### Three new S3 iterable readers: - reader/csv_reader_s3_iterable.py: parallel-prefetch CSV reader using s3dlio.get_object() with ThreadPoolExecutor prefetch - reader/hdf5_reader_s3_iterable.py: parallel-prefetch HDF5 reader using h5py core driver over BytesIO from s3dlio - reader/tfrecord_reader_s3_iterable.py: parallel-prefetch TFRecord reader; no protobuf decode (raw tensor extraction); fixed KeyError: -1 when thread_index=-1 by explicitly collecting all file_map values in that case ## TensorFlow framework layer (framework/tf_framework.py) ### Bug: all 7 storage methods used tf.io.gfile for S3 URIs tf.io.gfile does not support s3dlio-managed endpoints or auth. All 7 methods (read, write, delete, stat, listdir, makedirs, exists) were rewritten to dispatch to s3dlio.* for s3://, gs://, and az:// URIs, falling back to tf.io.gfile for local paths. ## Storage factory (storage/storage_factory.py) ### Bug: TENSORFLOW framework type got wrong storage class FrameworkType.TENSORFLOW was not in the ObjStoreLibStorage branch, so TensorFlow workloads got S3Storage which double-mangled S3 URIs. Added TENSORFLOW alongside PYTORCH in the ObjStoreLibStorage dispatch. ## Config handling (utils/config.py) ### Bug: build_sample_map_iter() called os.path.abspath() on S3 URIs os.path.abspath("s3://bucket/path") returns a local path like /current/dir/s3:/bucket/path, breaking all sample map construction for object storage workloads. Fixed with a StorageType.LOCAL_FS guard so abspath() is only called for local filesystem paths. ## AIStore storage (storage/aistore_storage.py) ### Bug: import-time logging.warning() fired unconditionally The except ImportError block emitted a logging.warning() even when aistore was not installed and the user had no intention of using AIStoreStorage. Moved the error to __init__() so it only fires when actually instantiated. ## Tests ### tests/test_data_generator_improvements.py (new, 24 tests) Unit and integration tests covering: - _file_seed() determinism and rank-uniqueness - _generate_files() template invocation count - NPZ BytesIO vs getvalue() correctness - Per-format generator smoke tests (mock storage) - MPI rank seeding uniqueness ### tests/test_s3dlio_object_store.py (new, 8 tests) End-to-end integration tests against real MinIO (opt-in via env var): - Full put + verify + get cycle for all 8 formats - All 8/8 formats confirmed passing: npy, npz, hdf5, parquet, csv, jpeg, png, tfrecord ## Documentation - docs/data_generator_analysis.md: implementation summary covering all bugs fixed, new readers added, test results, and file change inventory * chore: remove backup file from docs * fix: add dgen-py>=0.2.0 as required dependency gen_random_tensor() raises RuntimeError when dgen-py is not installed (used by IndexedBinaryGenerator and ParquetGenerator). Add to core_deps in setup.py and requirements.txt so CI installs it automatically. * fix: make dgen-py a soft dependency with numpy fallback When dgen-py is not installed (e.g. Python 3.9 which dgen-py does not support), gen_random_tensor() now logs a warning and falls back to numpy instead of raising RuntimeError. dgen-py remains the fast default on Python>=3.10 where it is available. Also restrict the pip install marker to python_version>='3.10' in both setup.py and requirements.txt. * ci: drop Python 3.9 and 3.10 from matrix, require 3.11+ * ci: drop Python 3.9/3.10, add dgen-py to requirements-test.txt * fix: soften dgen-py hard fail in config.py, fix CI cache key to include requirements-test.txt * fix: reduce parallelism in failing tests due to over subscription, from np=4 to np=1 * Stabilize DFTracer CI and checkpoint tests on Python 3.12 Summary of changes: - Limit CI Python matrix to 3.12 for deterministic DFTracer/runtime behavior. - Restore AI logging test parallelism to match intended CI behavior: pytest -n 4. - Add fail-fast CI preflight step that hard-fails on missing runtime imports: dftracer.python, dftracer.dftracer, dgen_py. - Add pytest-timeout to setup.py test extras and requirements-test.txt so timeout markers are enforced. - Fix pytest config table name in pyproject.toml: [tool.pytest.ini_options]. - Fix TensorFlow checkpoint path NameError by importing numpy and gen_random_tensor in tf_checkpointing.py. - Remove hard mlpstorage dependency from DLIO packaging to avoid circular dependency. - Add internal fallback backend (simple_streaming_checkpointing.py) for file/direct checkpoint save/load when mlpstorage is unavailable. - Keep object-store checkpointing explicit: raise clear ImportError if mlpstorage is missing when object-store checkpoint backend is selected. Local validation before commit: - Python 3.12 env created and requirements installed. - Targeted checkpoint smoke tests passed (PT/TF train-with-checkpoint + checkpoint-only): 4 passed. - Full checkpoint subset passed with DFTracer enabled: pytest -n 1 -v tests/dlio_ai_logging_test.py -k 'train_with_checkpoint or checkpoint_only' Result: 24 passed. - Full CI-equivalent AI logging command passed with DFTracer enabled: pytest tests/dlio_ai_logging_test.py -n 4 -v Result: 61 passed. * CI: align dgen-py usage with 0.2.2 and Python 3.11+ - keep dftracer imports required but treat dgen_py preflight as optional warning - bump dgen-py minimum to 0.2.2 in setup and requirements - align dgen-py marker to python_version >= 3.11 * Tests: fix output glob path in checkpoint benchmark verification Use os.path.join for *_output.json lookup so checkpoint tests detect rank output files correctly. * Tests: harden output-path checks and checkpoint count arithmetic - replace malformed output globs with os.path.join in benchmark helper tests - use integer arithmetic for expected checkpoint file counts * Tests: suppress Python 3.12 multiprocessing fork deprecation noise Add targeted pytest filter for multiprocessing.popen_fork warning in multi-threaded MPI test processes. * Tests: keep fork warnings visible and fix mocked S3 storage library config - revert pytest warning suppression for multiprocessing fork deprecations - set storage_options.storage_library=s3torchconnector in dlio_s3_benchmark_test fixture * Tests: make object-storage coverage minimal by default - keep small S3/AIStore smoke tests on by default - gate heavy object-storage matrices behind DLIO_OBJECT_STORAGE_EXTENDED=1 - ensure S3 mocks cover s3torchconnector client path - keep live s3dlio integration opt-in and reduce default format breadth * Tests: gate object-storage suites behind opt-in flag - disable S3 and AIStore mock tests unless DLIO_OBJECT_STORAGE_TESTS=1 - require DLIO_OBJECT_STORAGE_TESTS alongside DLIO_S3_INTEGRATION for live s3dlio tests * Skip object storage tests cleanly * Gate object storage CI steps * docs: add I/O issues analysis and executive summary (2026-03-28) Add comprehensive code review of DLIO benchmark I/O design issues covering: - MPI sharding correctness (TFRecord iterative sampler bug) - File vs. object storage reader asymmetry (correctness issue) - JPEG/PNG generator overhead and DALI path analysis - YAML config proliferation and proposed Hydra architecture - read_threads sizing, MPI topology integration - File/object rationalization proposal (Section 13) Also add executive summary version targeting decision-makers, with no code examples and links back to the full document. * ci: remove S3TorchConnector tests from CI workflow * fix: minio connection pool, s3torchconnector bool, obj_store fixes - obj_store_lib.py: handle Python bool for s3_force_path_style (was crashing with s3torchconnector which passes actual booleans from YAML) - obj_store_lib.py: urllib3 PoolManager maxsize=10 (fixes 'Connection pool is full, discarding connection' warnings with minio) - _s3_iterable_mixin.py, parquet_reader_s3_iterable.py: reader fixes - pytorch_checkpointing.py, pytorch_obj_store_checkpointing.py: fixes - utility.py: storage utility improvements - __init__.py: version/init updates * Add parquet configuration options to ConfigArguments and LoadConfig (#9) * Add parquet configuration options to ConfigArguments and LoadConfig * Make record_size a dynamic property * Optimize Parquet generation: pre-generate full table, zero-copy slice per row-group (#10) Reduces generation call overhead from (num_batches × num_columns) to num_columns per file. Arrow's table.slice() is zero-copy, so peak RAM holds one full file's column data but eliminates repeated dgen-py/numpy calls per batch. Based on PR#10 from mlcommons/DLIO_local_changes (not yet merged upstream). * Add uv support: [project] table in pyproject.toml + uv.lock - Add PEP 621 [project] table to pyproject.toml for uv compatibility - Add readme field to fix setuptools _long_description crash - Add optional-dependencies and entry_points for uv extras - setup.py left untouched for backward compatibility with pip/setuptools - Add uv.lock for reproducible uv environments - Add docs/DLIO_PR_Plan_2026-04-10.md: 8-PR implementation plan * chore: ignore coderag index and fastembed cache directories .coderag/ — local code-intelligence index generated by 'coderag index .' .fastembed_cache/ — ONNX model weights downloaded by coderag on first run Neither directory contains source code; both are regenerated on demand and must not be committed to the repository. * test: Batch E — test infra hardening, disable dftracer, spawn MP, fix warnings - Disable dftracer entirely: no import, always-active no-op stubs in utility.py; remove dftracer globals/calls from main.py; configure_dftracer/finalize_dftracer become no-ops in config.py; set_dftracer_initialize/finalize kept as no-ops - Change default multiprocessing_context from 'fork' to 'spawn' (avoids deadlocks in multi-threaded test processes); remove fork guard from configure_dlio_logging - Fix pin_memory: AND with torch.cuda.is_available() so no UserWarning on CPU hosts - Fix NumPy empty-slice warnings: guard io_save/duration_save stats with len() > 0 check, consistent with existing io_load guard in statscounter.py - Object-storage tests strictly opt-in via DLIO_OBJECT_STORAGE_TESTS=1 env var - Add DALI skip guards; make dftracer optional in pyproject.toml/setup.py - Fix DLIOMPI singleton reset in all test finalize() methods - Fix generate_random_shape to use seeded RNG (deterministic) - Remove dead duplicate OmegaConf call in test_npy_reader_compatibility - Remove dlp_logger/dftracer finalizer from TorchDataset worker * fix(readers+gen): PR-2/3 — local-FS reader parity + JPEG/PNG fast generation PR-2: Skip CPU decode and add parallel prefetch to local-FS readers S3 iterable readers pre-fetch all files in parallel (queue depth=N) before the iteration loop. Local-FS readers were opening and decoding files one at a time (queue depth=1). This structural asymmetry makes local-FS benchmarks artificially slow relative to their physical bandwidth. New _LocalFSIterableMixin (_local_fs_iterable_mixin.py): - _localfs_prefetch_all(): ThreadPoolExecutor parallel reads before next() - Stores only raw byte count per file (same pattern as _S3IterableMixin) - No numpy / PIL / h5py decode — those allocate and immediately discard data Applied to: ImageReader, NPYReader, HDF5Reader, NPZReader - open() returns cached byte count instead of decoded array - get_sample() uses byte count directly for image_size telemetry - next() calls _localfs_prefetch_all() before iterating PR-3: JPEG/PNG generator raw-bytes fast path (Option A) PIL encode costs ~30ms/file (JPEG) or ~100-200ms/file (PNG). Since PR-2 readers now only measure raw byte counts and never decode content, the encode step is pure overhead that does not affect benchmark results. When data_loader != native_dali: write records.tobytes() directly. - ~1000-4000x faster for large synthetic datasets - Safe: readers in PR-2 never decode content When data_loader == native_dali: keep full PIL encode (fn.decoders.image() requires a valid JPEG/PNG bitstream). * fix(config): PR-1/4/5 — iterative sampler bug, multiprocessing_context auto-derive, read_threads auto-sizing PR-1: build_sample_map_iter file-index reset for non-zero ranks The local sample_index counter was resetting file_index back to rank-0's partition on every iteration after the first. Fix: carry the rank offset (my_rank * files_per_rank) forward through all iterations so each rank stays in its own file partition. PR-4: multiprocessing_context auto-derive from storage_library s3dlio and s3torchconnector initialize async runtimes at import time. fork()-based DataLoader workers inherit broken file-descriptors. Auto-set multiprocessing_context='spawn' when storage_library is one of these and the user has not overridden the default. PR-5: read_threads auto-sizing Default read_threads=1 leaves I/O bandwidth on the table on modern NVMe and NVMe-oF systems. When read_threads==1 (the 'user did not set this' sentinel), auto-size to min(cpu_count // comm_size, 8) and log the choice. * test: fix test_npy_reader_compatibility for LocalFSIterableMixin design NPYReader.open() returns int byte count on this branch (not ndarray) — that is correct by design: _LocalFSIterableMixin skips decode and caches only byte counts for storage-bandwidth benchmarking parity with _S3IterableMixin. Update assertion to match actual contract and verify file content directly via np.load. * test: Batch E — test infra hardening, disable dftracer, spawn MP, fix warnings - Disable dftracer entirely: no import, always-active no-op stubs in utility.py; remove dftracer globals/calls from main.py; configure_dftracer/finalize_dftracer become no-ops in config.py; set_dftracer_initialize/finalize kept as no-ops - Change default multiprocessing_context from 'fork' to 'spawn' (avoids deadlocks in multi-threaded test processes); remove fork guard from configure_dlio_logging - Fix pin_memory: AND with torch.cuda.is_available() so no UserWarning on CPU hosts - Fix NumPy empty-slice warnings: guard io_save/duration_save stats with len() > 0 check, consistent with existing io_load guard in statscounter.py - Object-storage tests strictly opt-in via DLIO_OBJECT_STORAGE_TESTS=1 env var - Add DALI skip guards; make dftracer optional in pyproject.toml/setup.py - Fix DLIOMPI singleton reset in all test finalize() methods - Fix generate_random_shape to use seeded RNG (deterministic) - Remove dead duplicate OmegaConf call in test_npy_reader_compatibility - Remove dlp_logger/dftracer finalizer from TorchDataset worker * test: add PR verification benchmarks and report (April 12, 2026) - bench_generation.py: JPEG/PNG fast-path vs PIL encode speedup - bench_readers.py: reader parity baseline benchmark - bench_readers2.py: decode cost isolation + parallel prefetch analysis - bench_config_fixes.py: 16-case behavioral verification of config.py fixes (iterative sampler bug, multiprocessing_context auto-derive, read_threads auto-size) - dlio_fix_verification_report.md: full results report for PR submissions All scripts run via: uv run python tests/PRs-12-Apr-26/<script>.py * fix: parallel generation, storage env vars, MPI topology, settle guard (Issues 9,10,11,12,13,6b) PR-13 (Issue 12): Add ranks_per_node() to DLIOMPI; use it for read/write thread auto-sizing instead of total comm_size so multi-node configs divide CPUs by ranks-per-node, not total world size. PR-14 (Issues 10+11+6b): Parallel data generation via ThreadPoolExecutor. - Add write_threads field to ConfigArguments (default 1, auto-sized like read_threads) - Rewrite _generate_files() with two-phase design: pre-derive RNG seeds sequentially (preserves determinism), then dispatch writes in parallel worker threads - Each worker gets its own np.random.default_rng(seed) — no shared state - Identical output with any write_threads value (determinism guaranteed) - Add clarifying comment to DataGenerator.__init__ re: Issue 6b (non-bug confirm) PR-12 (Issue 9): Storage environment-variable overrides in _apply_env_overrides(). - DLIO_STORAGE_LIBRARY → storage_options['storage_library'] - DLIO_BUCKET → storage_root (if unset) - DLIO_STORAGE_TYPE → storage_type (if unset) - AWS_ACCESS_KEY_ID → storage_options['access_key_id'] - AWS_SECRET_ACCESS_KEY→ storage_options['secret_access_key'] - AWS_ENDPOINT_URL → storage_options['endpoint_url'] - AWS_REGION → storage_options['region'] - YAML/CLI values always win (env vars only fill unset fields) - Optional dotenv dict support with env vars taking priority PR-15 (Issue 13): Post-generation settle guard for eventual-consistency stores. - Add post_generation_settle_seconds: float = 0.0 to ConfigArguments - Load from config['storage']['post_generation_settle_seconds'] in LoadConfig() - Add _apply_settle_guard(args, comm) to main.py: rank-0 sleeps then barrier - Guard is no-op for LOCAL_FS or when settle=0 (zero behaviour change by default) - Call after generation barrier in DLIOBenchmark.initialize() Tests: tests/test_remaining_issues.py — 28 tests, all pass * fix: cap auto-sized thread count in tests and CI (DLIO_MAX_AUTO_THREADS) The parallel generation feature (PR #12) auto-sizes write_threads and read_threads to cpu_count // ranks_per_node, which on a 64-core dev box produced 8 threads per test. Running the full suite with that many threads caused resource contention between concurrent mpirun subprocesses, leading to intermittent failures (empty stdout/stderr, return code 1). Changes: - config.py: replace hardcoded cap of 8 with int(os.environ.get('DLIO_MAX_AUTO_THREADS', '8')) so the ceiling is overridable without code changes - tests/conftest.py: set DLIO_MAX_AUTO_THREADS=2 via os.environ.setdefault so all tests (local and CI) run with a known bounded thread count - .github/workflows/ci.yml: add DLIO_MAX_AUTO_THREADS: '2' to the job env block so GitHub Actions runners (2 vCPUs) are not over-committed No behaviour change for production runs (default cap remains 8). * chore: remove uv.lock, internal docs, and bench scripts from upstream PR * chore: lock Python to 3.12 only (>=3.12,<3.13) * fix: add pydftracer to setup.py test deps for Preflight C extension check * fix: restore pydftracer to core_deps (was removed in earlier commit, breaks via-setup Preflight) * fix: add pydftracer to pyproject.toml dependencies (pip ignores setup.py install_requires when [project] table exists) * fix: add dftracer>=2.0.1 to pyproject.toml test extras (provides C extension dftracer.dftracer required by Preflight) * fix: add pyproject.toml to CI cache key to prevent stale venv reuse * fix: drop incomplete last batch in FormatReader.next() to match drop_last=True semantics PyTorch DataLoader uses drop_last=True which discards the final incomplete batch. The TF path goes through FormatReader.next() which was padding the last batch to batch_size — causing extra fetch_iter events and failing assertions: expected_iters = num_epochs * (num_data_pp // batch_size) # floor division Removing the padding means an incomplete last batch is silently dropped (the yield only fires when len(batch) == batch_size). This matches PyTorch behavior and fixes test_ai_logging_train[tensorflow-*] failures. * fix: pin read_threads=1 in test_ai_logging_train to avoid per-thread batch count mismatch Auto-threading (PR-5/PR-13) can set read_threads>1 in CI. With multiple TF interleave threads, samples are partitioned per-thread, so each thread yields floor(samples_per_thread/batch_size) batches — the sum is less than floor(total_samples/batch_size), breaking the assertion: expected_iters = num_epochs * (num_data_pp // batch_size) e.g. 10 samples, bs=2, read_threads=2: expected = 10//2 = 5 but actual = 2*(floor(5/2)) = 4 The test was written assuming single-threaded iteration. Pin read_threads=1 to match that assumption. test_ai_logging_train_with_step is unaffected as it already parametrizes read_threads explicitly. * fix: restore dftracer loading — conditional import from DFTRACER_ENABLE env var The d9f175b commit hardcoded DFTRACER_ENABLE=False and replaced all dftracer code with permanent no-op stubs. This broke dlio_ai_logging_test.py entirely, since those tests require actual .pfw trace files to be written. - utility.py: restore conditional dftracer.python import; fall back to no-op stubs only when the library is absent or raises ImportError - config.py: restore configure_dftracer() to call PerfTrace.initialize_log() when DFTRACER_ENABLE is True; restore finalize_dftracer() to flush - main.py: restore dftracer_initialize/finalize/dftracer globals, the call to configure_dftracer() in __init__(), and the finalize_dftracer() call in finalize(); restore set_dftracer_initialize/finalize to update globals CI sets DFTRACER_ENABLE=1, which now causes the real dftracer C extension to initialize and write .pfw trace files, allowing test_ai_logging to pass. * fix: add missing global declarations for dftracer in __init__ and finalize() Without 'global dftracer' in __init__, the assignment: dftracer = self.args.configure_dftracer(...) created a local variable, leaving the module-level dftracer=None. finalize() then saw None and never called finalize_dftracer(). TensorFlow calls os._exit() during shutdown, bypassing Python atexit handlers — so without explicit finalize_dftracer(), the .pfw trace file is never flushed. PyTorch exits cleanly so dftracer's atexit fires as a fallback, which is why pytorch tests passed but tensorflow tests did not. * ci: remove test_ai_logging step — dftracer integration tests not needed * ci: replace push/PR trigger with fast CI suite; keep integration tests as manual - Rename ci.yml → integration.yml; change trigger to workflow_dispatch only so the full 21-suite run is manual-only (GitHub Actions UI). - Add .github/workflows/fast-ci.yml: 3-leg matrix (via-uv, via-setup, via-reqs) that runs on every push and PR, completes in < 10 minutes. - Add tests/test_fast_ci.py: 65-test fast CI suite covering preflight imports, enumerations, utilities, config defaults/derive, generator and reader factories, NPY/NPZ/HDF5/image I/O, MPI smoke (np=2), and an end-to-end generate+train pipeline. * ci: add pyarrow to requirements-test.txt for via-reqs leg pyarrow is a core dependency in pyproject.toml but was missing from requirements-test.txt, causing test_pyarrow to fail in the via-reqs matrix leg. --------- Signed-off-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com> Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com> Co-authored-by: Darien Imai <941951+dpsi@users.noreply.github.com> Co-authored-by: Izzet Yildirim <yildirim2@llnl.gov> Co-authored-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com> Co-authored-by: enakta <140368024+enakta@users.noreply.github.com> Co-authored-by: Denis Barakhtanov <denis.barahtanov@gmail.com> Co-authored-by: Wolfgang De Salvador <118554802+wolfgang-desalvador@users.noreply.github.com>
Merge Conflicts - Now re-implementing.This IS a good change, except now there are merge conflicts. Thank you for the work Wolfgang, here is an analysis of the improvements using dgen-py streaming data generation. The new dgen-py streaming method has linear memory usage, maxing out at 64 MB vs. old method required entire Parquet object to reside in memory, potentially using GB's. Also, new streaming method is about 2 - 3x faster. However, this step is no longer the bottleneck. The problem has now moved to being the slow, PyArrow memory encoding, which will require another set of changes. I will update this PR branch to accommodate the new streaming method and list Wolfgang as a co-author. Probably easiest to just close this PR. dgen-py 0.2.3 Data Generation Benchmark ResultsPlatform: 12 logical CPUs Method Summary
ResultsAnalysisThroughputFor files ≥ 100 MB — the typical case for AI/ML training data — the new path delivers 2–3x higher throughput:
The BEFORE path plateaus at ~18 GB/s because a new Rayon thread pool is created on every The AFTER path stabilizes at ~40 GB/s and has not hit its ceiling even at 10 GB — this is the machine's memory bandwidth limit, not a dgen-py limit. MemoryThe RAM reduction is the more important improvement for large files:
With the old API, generating a single 10 GB file required 10+ GB of RAM to be live simultaneously. This made large-file generation impractical on memory-constrained nodes and completely impossible if multiple MPI ranks shared a node. With the new API, peak RAM is always 64 MB, regardless of file size. An 8-rank node generating 10 GB files previously required 80+ GB of RAM. Now it requires ~512 MB total. Small file note (1 MB)The reported 470x speedup at 1 MB is an artifact of Rayon thread pool startup latency (~1 ms), which dominates the measurement for sub-2ms operations. Both paths produce the same amount of data; the BEFORE path simply pays a fixed pool-creation cost that is disproportionate at this scale. At typical training file sizes (100 MB–10 GB), the real speedup is 2–3x. 10 MB boundary behaviorAt 10 MB, both methods measure ~5–6 GB/s with near-identical times. This size sits just below the 32 MB Key Takeaways
|
This pull request optimizes the data generation process in
parquet_generator.pyby reducing redundant function calls and improving batch processing efficiency. The main change is to pre-generate all column data for the entire file before batching, which reduces overhead and leverages zero-copy slicing for batch creation.Performance optimizations:
_generate_batch_columnsor_generate_legacy_batch, reducing the number of function calls from (num_batches * num_columns) to just num_columns.full_table.slice(...)), which is more efficient and avoids repeated data generation.