Skip to content

Speed-up Parquet data generation#10

Open
wolfgang-desalvador wants to merge 1 commit intomlcommons:mainfrom
wolfgang-desalvador:wdesalvador/improve-parquet-data-generation
Open

Speed-up Parquet data generation#10
wolfgang-desalvador wants to merge 1 commit intomlcommons:mainfrom
wolfgang-desalvador:wdesalvador/improve-parquet-data-generation

Conversation

@wolfgang-desalvador
Copy link
Copy Markdown

This pull request optimizes the data generation process in parquet_generator.py by reducing redundant function calls and improving batch processing efficiency. The main change is to pre-generate all column data for the entire file before batching, which reduces overhead and leverages zero-copy slicing for batch creation.

Performance optimizations:

  • All column data for the entire file is now generated upfront using either _generate_batch_columns or _generate_legacy_batch, reducing the number of function calls from (num_batches * num_columns) to just num_columns.
  • During batch processing, each batch is now created by slicing the pre-generated full table (full_table.slice(...)), which is more efficient and avoids repeated data generation.

@wolfgang-desalvador
Copy link
Copy Markdown
Author

This method needs to be validated @russfellows @wvaske @FileSystemGuy since it would require from the processes the ability to keep in memory the whole 3+ GiB parquet file.
This maybe could reply guidance or a way to use bigger chunks

@russfellows
Copy link
Copy Markdown

This looks like a good change. I will try this out, and see if there are any further optimizations that can be made as well. Thanks Wolfgang

russfellows added a commit to russfellows/dlio_benchmark that referenced this pull request Apr 10, 2026
… into row groups

Based on mlcommons#10 (Wolfgang De Salvador).
Generate all column data in one pass before the batch loop, then use
pa.Table.slice() (zero-copy in Arrow) to produce each row-group batch.

Reduces generation call overhead from (num_batches × num_columns) to
just num_columns calls. For a file with 10 batches and 5 columns this
is a 10× reduction in gen_random_tensor calls.

Improvement over upstream PR: added explicit memory trade-off comment
and clarified the zero-copy slice semantics.
@wolfgang-desalvador
Copy link
Copy Markdown
Author

@mlcommons/wg-storage-approvers Could you take a look at this PR?

@russfellows
Copy link
Copy Markdown

russfellows commented Apr 14, 2026 via email

FileSystemGuy pushed a commit that referenced this pull request Apr 16, 2026
…d, CI thread cap (#12)

* Fix s3pytorch force path style boolean option.

* Refactor S3 pytorch implementation.
Change code to use storage_root config option and namespace.
Removes urlparsing for each I/O.
Updates some default config options to be sane for both file and object.

* feat: Add multi-library S3 storage support (minio, s3dlio, s3torch)

- Add StorageLibrary enum to consistently select S3 libraries
- Refactor storage_factory to route to selected library backends
- Implement MinIO storage backend with MPI rank-based endpoint selection
- Implement s3dlio storage backend with native multi-endpoint support
- Enable comparison testing across S3 client libraries

This enables DLIO benchmarks to test different S3 client implementations
for performance comparison and multi-endpoint load balancing strategies.

* refactor: convert direct imports to lazy imports in profiler_factory (#325)

- Move profiler imports inside get_profiler() method
- Benefits:
  - Avoids loading TFProfiler (which imports tensorflow) unless needed
  - Reduces import overhead for users not using TENSORBOARD profiler
  - Default profiler (IOSTAT) no longer triggers tensorflow import
- No breaking changes - same API, same behavior

* feat: add native AIStore storage backend (#321)

Add a native AIStore storage handler that uses the official AIStore
Python SDK for direct access, bypassing the S3 compatibility layer
for better performance and simpler configuration.

Changes:
- Add AIStoreStorage class with full CRUD operations, range reads,
  and prefix-based object listing
- Add StorageType.AISTORE enum and wire it through StorageFactory,
  GeneratorFactory, and ReaderFactory (reuses S3 generators/readers)
- Add AIStore endpoint configuration support in ConfigArguments
- Add 'aistore' optional dependency in setup.py
- Add mock-based test suite with full AIStore SDK simulation
- Add CI workflow for AIStore tests
- Add storage configuration section to documentation

Supported formats: NPY, NPZ, JPEG
Supported frameworks: PyTorch, TensorFlow

Signed-off-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com>

* fix(counters): train phase was not evaluated (#328)

* fix(counters): train phase was not evaluated

PR #302 moved loop breaking condition from the end of the loop at its
start.
Which never fires self.stats.end_block of the current block as the
iteration never start.

Trying regulat pytorch loader from local fs:

```
[OUTPUT] 2026-02-27T06:58:50.214359 Running DLIO [Training & Evaluation] with 2 process(es)
[WARNING] The amount of dataset is smaller than the host memory; data might be cached after the first epoch. Increase the size of dataset to eliminate the caching effect!!!
[OUTPUT] 2026-02-27T06:58:50.229669 Max steps per epoch: 128 = 1 * 1024 / 4 / 2 (samples per file * num files / batch size / comm size)
[OUTPUT] 2026-02-27T06:58:50.229764 Steps per eval: 32 = 1 * 64 / 1 / 2 (samples per file * num files / batch size eval / comm size)
[OUTPUT] 2026-02-27T06:58:50.278417 Starting epoch 1: 128 steps expected
[OUTPUT] 2026-02-27T06:58:50.278614 Starting block 1
[OUTPUT] 2026-02-27T06:59:03.743752 Ending epoch 1 - 128 steps completed in 13.47 s
[OUTPUT] 2026-02-27T06:59:03.747196 Starting eval - 32 steps expected
[OUTPUT] 2026-02-27T06:59:07.122980 Ending eval - 32 steps completed in 3.38 s
[OUTPUT] 2026-02-27T06:59:07.124598 Epoch 1 [Eval] Accelerator Utilization [AU] (%): 99.4141
[OUTPUT] 2026-02-27T06:59:07.124644 Epoch 1 [Eval] Throughput (samples/second): 18.9592
[OUTPUT] 2026-02-27T06:59:07.130596 Starting epoch 2: 128 steps expected
[OUTPUT] 2026-02-27T06:59:07.130832 Starting block 1
[OUTPUT] 2026-02-27T06:59:20.047588 Ending epoch 2 - 128 steps completed in 12.92 s
[OUTPUT] 2026-02-27T06:59:20.048553 Starting eval - 32 steps expected
[OUTPUT] 2026-02-27T06:59:23.276666 Ending eval - 32 steps completed in 3.23 s
[OUTPUT] 2026-02-27T06:59:23.277556 Epoch 2 [Eval] Accelerator Utilization [AU] (%): 99.4022
[OUTPUT] 2026-02-27T06:59:23.277595 Epoch 2 [Eval] Throughput (samples/second): 19.8261
[OUTPUT] 2026-02-27T06:59:23.280422 Starting epoch 3: 128 steps expected
[OUTPUT] 2026-02-27T06:59:23.280591 Starting block 1
[OUTPUT] 2026-02-27T06:59:36.196122 Ending epoch 3 - 128 steps completed in 12.92 s
[OUTPUT] 2026-02-27T06:59:36.197005 Starting eval - 32 steps expected
[OUTPUT] 2026-02-27T06:59:39.425806 Ending eval - 32 steps completed in 3.23 s
[OUTPUT] 2026-02-27T06:59:39.426645 Epoch 3 [Eval] Accelerator Utilization [AU] (%): 99.4032
[OUTPUT] 2026-02-27T06:59:39.426682 Epoch 3 [Eval] Throughput (samples/second): 19.8219
[OUTPUT] 2026-02-27T06:59:39.469524 Saved outputs in /lus/flare/projects/DAOS_Testing/PAP166/hydra_log/default/2026-02-27-06-58-50
[OUTPUT] Averaged metric over all steps/epochs
[METRIC] ==========================================================
[METRIC] Number of Simulated Accelerators: 2
[METRIC] Training Accelerator Utilization [AU] (%): 0.0000 (0.0000)
[METRIC] Training Throughput (samples/second): 0.0000 (0.0000)
[METRIC] Training I/O Throughput (MB/second): 0.0000 (0.0000)
[METRIC] train_au_meet_expectation: fail
[METRIC] Eval Accelerator Utilization [AU] (%): 49.7048 (0.0028)
[METRIC] Eval Throughput (samples/second): 9.765259 (0.206374)
[METRIC] Eval Throughput (MB/second): 0.038146 (0.000806)
[METRIC] eval_au_meet_expectation: fail
[METRIC] ==========================================================

[OUTPUT] 2026-02-27T06:59:39.484237 outputs saved in RANKID_output.json
```

Notice that logs are only show starting of the block and never its
ending.

After the fix:
```
[OUTPUT] 2026-02-28T12:30:28.000590 Running DLIO [Training & Evaluation] with 2 process(es)
[WARNING] The amount of dataset is smaller than the host memory; data might be cached after the first epoch. Increase the size of dataset to eliminate the caching effect!!!
[WARNING] Number of files for training in /dataset/train (4000) is more than requested (64). A subset of files will be used
[WARNING] Number of files for training in /dataset/train (4000) is more than requested (64). A subset of files will be used
[OUTPUT] 2026-02-28T12:30:28.102857 Max steps per epoch: 8 = 1 * 64 / 4 / 2 (samples per file * num files / batch size / comm size)
[OUTPUT] 2026-02-28T12:30:28.102992 Steps per eval: 4 = 1 * 8 / 1 / 2 (samples per file * num files / batch size eval / comm size)
[OUTPUT] 2026-02-28T12:30:30.572480 Starting epoch 1: 8 steps expected
[OUTPUT] 2026-02-28T12:30:30.573084 Starting block 1
[OUTPUT] 2026-02-28T12:30:30.734535 Ending block 1 - 8 steps completed in 0.16 s
[OUTPUT] 2026-02-28T12:30:30.740906 Epoch 1 - Block 1 [Training] Accelerator Utilization [AU] (%): 0.1428
[OUTPUT] 2026-02-28T12:30:30.740994 Epoch 1 - Block 1 [Training] Throughput (samples/second): 1753.1357
[OUTPUT] 2026-02-28T12:30:30.741060 Epoch 1 - Block 1 [Training] Computation time per step (second): 0.0000+/-0.0000 (set value: {})
[OUTPUT] 2026-02-28T12:30:30.741497 Ending epoch 1 - 8 steps completed in 0.17 s
[OUTPUT] 2026-02-28T12:30:30.742789 Starting eval - 4 steps expected
[OUTPUT] 2026-02-28T12:30:30.889307 Ending eval - 4 steps completed in 0.15 s
[OUTPUT] 2026-02-28T12:30:30.891985 Epoch 1 [Eval] Accelerator Utilization [AU] (%): 0.0720
[OUTPUT] 2026-02-28T12:30:30.892054 Epoch 1 [Eval] Throughput (samples/second): 54.6620
[OUTPUT] 2026-02-28T12:30:30.900919 Starting epoch 2: 8 steps expected
[OUTPUT] 2026-02-28T12:30:30.901249 Starting block 1
[OUTPUT] 2026-02-28T12:30:30.914273 Ending block 1 - 8 steps completed in 0.01 s
[OUTPUT] 2026-02-28T12:30:30.915472 Epoch 2 - Block 1 [Training] Accelerator Utilization [AU] (%): 1.9055
[OUTPUT] 2026-02-28T12:30:30.915541 Epoch 2 - Block 1 [Training] Throughput (samples/second): 7765.7316
[OUTPUT] 2026-02-28T12:30:30.915595 Epoch 2 - Block 1 [Training] Computation time per step (second): 0.0000+/-0.0000 (set value: {})
[OUTPUT] 2026-02-28T12:30:30.915931 Ending epoch 2 - 8 steps completed in 0.02 s
[OUTPUT] 2026-02-28T12:30:30.917061 Starting eval - 4 steps expected
[OUTPUT] 2026-02-28T12:30:30.958733 Ending eval - 4 steps completed in 0.04 s
[OUTPUT] 2026-02-28T12:30:30.959729 Epoch 2 [Eval] Accelerator Utilization [AU] (%): 0.0381
[OUTPUT] 2026-02-28T12:30:30.959768 Epoch 2 [Eval] Throughput (samples/second): 192.2493
[OUTPUT] 2026-02-28T12:30:30.960091 Starting epoch 3: 8 steps expected
[OUTPUT] 2026-02-28T12:30:30.960275 Starting block 1
[OUTPUT] 2026-02-28T12:30:30.976061 Ending block 1 - 8 steps completed in 0.02 s
[OUTPUT] 2026-02-28T12:30:30.977423 Epoch 3 - Block 1 [Training] Accelerator Utilization [AU] (%): 0.6369
[OUTPUT] 2026-02-28T12:30:30.977483 Epoch 3 - Block 1 [Training] Throughput (samples/second): 6020.3520
[OUTPUT] 2026-02-28T12:30:30.977534 Epoch 3 - Block 1 [Training] Computation time per step (second): 0.0000+/-0.0000 (set value: {})
[OUTPUT] 2026-02-28T12:30:30.977792 Ending epoch 3 - 8 steps completed in 0.02 s
[OUTPUT] 2026-02-28T12:30:30.978884 Starting eval - 4 steps expected
[OUTPUT] 2026-02-28T12:30:30.983803 Ending eval - 4 steps completed in 0.00 s
[OUTPUT] 2026-02-28T12:30:30.984927 Epoch 3 [Eval] Accelerator Utilization [AU] (%): 1.3682
[OUTPUT] 2026-02-28T12:30:30.984986 Epoch 3 [Eval] Throughput (samples/second): 1641.1245
[OUTPUT] 2026-02-28T12:30:30.986010 Saved outputs in /home/denis/dev/enakta/dlio_benchmark/hydra_log/default/2026-02-28-12-30-25
[OUTPUT] Averaged metric over all steps/epochs
[METRIC] ==========================================================
[METRIC] Number of Simulated Accelerators: 2
[METRIC] Training Accelerator Utilization [AU] (%): 0.5939 (0.4129)
[METRIC] Training Throughput (samples/second): 4948.3957 (2466.6534)
[METRIC] Training I/O Throughput (MB/second): 19.3297 (9.6354)
[METRIC] train_au_meet_expectation: fail
[METRIC] Eval Accelerator Utilization [AU] (%): 0.4704 (0.5038)
[METRIC] Eval Throughput (samples/second): 444.414075 (396.070635)
[METRIC] Eval Throughput (MB/second): 1.735992 (1.547151)
[METRIC] eval_au_meet_expectation: fail
[METRIC] ==========================================================

[OUTPUT] 2026-02-28T12:30:30.987839 outputs saved in RANKID_output.json
```

Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com>

* fix: remove unreachable branch

Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com>

---------

Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com>
Co-authored-by: Denis Barakhtanov <denis.barahtanov@gmail.com>

* refactor(generators): unify generators to work with any storage backend (#329)

Every new storage backend required copy-pasting each generator into an
_XXX sibling file: npz_generator_s3.py, npy_generator_s3.py and so on.
The only difference was whether to write the output locally on disk,
directly via numpy/PIL, or via the storage interface.
This makes the pattern unsustainable: two duplicated formats today, more
with each new backend — incurring a significant maintenance burden.

Since all generators already had a storage instance and used it to
generate file names, we can leverage it.

The only set of generators now can check if the stroage is locally available
via `islocalfs` and use some optimisation, if any. If the storage is not local,
the sample serializes to io.BytesIO, call buf.getvalue(), and
delegate to self.storage.put_data().
All storage backends receive plain bytes as designed by the storage interface,
removing type inspection, seek() and getvalue() calls scattered across backends.

- FileStorage.put_data was never called, had text-mode open and a double
  get_uri call (once from the generator, once inside put_data itself).
  Now it is the default write path for LOCAL_FS, used by almost every
  workload config. get_data aligned to binary mode ("rb") for consistency.
- AIStoreStorage.put_data: remove isinstance dispatch, accept bytes directly.
- S3TorchStorage.put_data: remove data.getvalue() — just write data.
- generator_factory: removed S3/AIStore branching for NPZ, NPY, JPEG.
- factory referenced jpeg_generator_s3.JPEGGeneratorS3 which never existed;
  JPEG + S3/AIStore would crash at import time.

After this patch, adding a new storage backend requires no changes in any
generator. Adding a new data format automatically works with all backends.

Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com>
Co-authored-by: Denis Barakhtanov <denis.barahtanov@gmail.com>

* feat: object storage integration work-in-progress (multi-library S3, dpsi backends, checkpointing)

- Rework s3_torch_storage.py with multi-library S3 support
- Enhance all 4 checkpointing modules (pytorch, pytorch_s3, tf, base)
- Remove minio_storage.py and s3dlio_storage.py (consolidated)
- Add s3_storage_dpsi.py and s3_torch_storage_dpsi.py (new dpsi backends)
- Update storage_factory.py, config.py, utility.py, enumerations.py
- Update unet3d S3 workload configs
- Update jpeg/png data generators and main.py

WIP snapshot: 2026-03-18

* refactor: consolidate S3 storage, fix test output dir, centralise env-var config

- Rename s3_torch_storage.py → obj_store_lib.py (multi-library backend)
- Delete s3_torch_storage_dpsi.py (dpsi architecture removed)
- storage_factory.py: route S3+PyTorch to ObjStoreLibStorage only (no dpsi branch)
- config.py: add _load_dotenv() and _apply_env_overrides() per Hari's recommendation
  Single location for all os.getenv calls; precedence: YAML > env > .env > defaults
  Introduces DLIO_OUTPUT_FOLDER env var to redirect output directory
- conftest.py: set DLIO_OUTPUT_FOLDER=dlio_test_output for all test runs
- dlio_benchmark_test.py: inject output.folder via OmegaConf; clean() uses named dir
- dlio_s3_benchmark_test.py: same output dir fix + run_benchmark() OmegaConf injection
- dlio_aistore_benchmark_test.py: same fix + add missing OmegaConf import

* feat: add parallel S3 iterable readers and parquet byte-range support (v3.0.0-beta)

Version bump: 2.0.0 → 3.0.0-beta
- setup.py: version 3.0.0, Development Status 4-Beta, add 'parquet' extra
  requiring pyarrow>=12.0.0

New readers (dlio_benchmark/reader/):
- npz_reader_s3_iterable.py: NPZReaderS3Iterable — parallel prefetch of all
  NPZ files assigned to a DLIO worker thread via s3dlio.get_many() (up to 64
  concurrent range GETs) or minio ThreadPoolExecutor; eliminates the serial
  one-round-trip-per-file penalty of the existing NPZReaderS3
- npy_reader_s3_iterable.py: NPYReaderS3Iterable — mirrors NPZ version for
  raw numpy files (no key extraction)
- parquet_reader_s3_iterable.py: ParquetReaderS3Iterable — row-group-granular
  parquet reader using HTTP byte-range GETs; opens files by reading only the
  footer, then fetches individual row groups on demand via s3dlio.get_range()
  or minio.get_object(offset=, length=); LRU-bounded row-group cache;
  supports optional column projection via storage_options.columns
  Adapter classes: _S3RangeFile (s3dlio/s3torchconnector) and _MinioRangeFile
  provide the seekable file-like interface required by pyarrow.parquet

Storage and config fixes:
- obj_store_lib.py: remove env-var fallbacks (STORAGE_LIBRARY, DLIO_URI_SCHEME,
  DLIO_OBJECT_KEY_USE_FULL_URI, DLIO_ENDPOINT_URL) — config.py is now the
  single source of truth; values must flow through storage_options
- obj_store_lib.py: fix list_objects() to use s3dlio.list(uri, recursive=True)
  with correct prefix stripping (removes double-slash and bucket-prefix issues)
- config.py: promote storage.storage_library from top-level storage section
  into storage_options dict so backends can access it consistently

Enumerations:
- enumerations.py: add FormatType.PARQUET = 'parquet' and get_enum() branch

reader_factory.py:
- Route FormatType.NPZ + FormatType.NPY to iterable readers when
  storage_library is s3dlio, s3torchconnector, or minio
- Route FormatType.PARQUET to ParquetReaderS3Iterable

All three reader variants support s3dlio, s3torchconnector, and minio as
interchangeable storage backends via storage_options.storage_library.

* feat: add DIRECT_FS storage type and route it in storage_factory

Add StorageType.DIRECT_FS = 'direct_fs' to enumerations so that the
O_DIRECT backend (via s3dlio direct:// URI) can be selected at runtime.
Update storage_factory.py to treat DIRECT_FS identically to LOCAL_FS
for DLIO's internal file-listing path; actual I/O is handled by the
StreamingCheckpointing layer which routes direct_fs to s3dlio.

* feat: add PT_OBJ_SAVE checkpoint type for minio/s3dlio object store backends

Introduce a new checkpoint type PT_OBJ_SAVE that routes checkpointing
through pytorch_obj_store_checkpointing.py, enabling minio and s3dlio
as checkpoint storage backends without requiring s3torchconnector.

Key changes:
- pytorch_obj_store_checkpointing.py: New checkpoint engine using
  ObjStoreLibStorage for save/load via minio or s3dlio libraries
- pytorch_checkpointing.py: Use _streaming_cache dict keyed by backend
  type; select direct_fs (O_DIRECT) vs file (fadvise) based on
  storage_type arg
- pytorch_s3_checkpointing.py: S3 backend refinements
- checkpointing_factory.py: Route PT_OBJ_SAVE to new class
- config.py: Fix storage_library-aware validation; convert OmegaConf
  DictConfig to plain dict before adding dynamic storage_library key

* feat: add multi-library S3 iterable readers with strict isolation and image support

SUMMARY
=======
This commit consolidates a major enhancement to DLIO's S3 object storage support.
All changes support three storage libraries (s3dlio, s3torchconnector, minio) with
strict per-library isolation — no silent fallback, no cross-library usage. Failing
to install the configured library raises ImportError at construction time with a
clear pip install hint.

CHANGED FILES
=============

dlio_benchmark/reader/npy_reader_s3_iterable.py
  - REPLACED: Rewrote from scratch. Previously, s3torchconnector branch silently
    called s3dlio.get_many() — wrong library, wrong behavior, wrong docstring.
  - NEW: Dedicated _prefetch_s3torchconnector() method using S3IterableDataset
    .from_objects() with S3ReaderConstructor.sequential() — no s3dlio dependency.
  - NEW: Early ImportError in __init__ if s3torchconnector not installed.
  - NEW: Strict per-library dispatch in _prefetch(): s3dlio / s3torchconnector /
    minio each handled explicitly; raises ValueError for unknown library.
  - NEW: Full module docstring listing all 3 libraries and strict-isolation warning.
  - FIXED: s3torchconnector env var not set for s3torchconnector (only s3dlio).

dlio_benchmark/reader/npz_reader_s3_iterable.py
  - FIXED: stale docstrings removed ('listing uses s3dlio' was false for
    s3torchconnector and minio paths).
  - IMPROVED: _prefetch_s3dlio uses _BytesViewIO + io.BufferedReader to trigger
    np.load's readinto() path (in-place copy into numpy buffer) rather than
    bytes() (separate Python allocation). Peak memory: Rust BytesView only.
  - IMPROVED: _get_minio_client() cached across epochs for TCP keep-alive;
    urllib3 PoolManager with retry, timeout, maxsize=16 configuration.
  - IMPROVED: _prefetch_s3torchconnector() uses S3IterableDataset.from_objects()
    with sequential() reader (matches npy pattern).
  - IMPROVED: Module docstring accurately describes all 3 libraries.

dlio_benchmark/reader/parquet_reader_s3_iterable.py
  - FIXED CRITICAL: s3torchconnector previously used s3dlio.get_range() and
    s3dlio.stat() internally — completely wrong, the docstring lied.
  - NEW: s3torchconnector uses S3Client.get_object() with
    S3ReaderConstructor.range_based() returning RangedS3Reader (BufferedIOBase
    with full seek/tell/read/readinto + SEEK_END). Requires s3torchconnector>=1.3.0.
  - NEW: Early ImportError + RuntimeError (version check for range_based attr)
    in __init__ for s3torchconnector — fail at construction, not during I/O.
  - NEW: self._s3torch_client = S3Client cached at construction time.
  - NEW: _make_range_file() dispatches to native RangedS3Reader for
    s3torchconnector; _S3RangeFile for s3dlio; _MinioRangeFile for minio.
  - FIXED: urllib.parse.urlparse import moved to top-level imports (was
    duplicated inside branches).
  - FIXED: Module docstring corrected — removed false 'uses s3dlio as engine'.

dlio_benchmark/reader/reader_factory.py
  - NEW: JPEG/PNG storage_type routing — was completely missing, silently sending
    S3 workloads to ImageReader (local FS reader that calls PIL.Image.open(path)),
    which would fail hard with a misleading file-not-found error.
  - NEW: Routes JPEG and PNG on S3/AIStore with recognized storage_library to
    ImageReaderS3Iterable; falls back to ImageReader for local FS.
  - UNCHANGED: NPY/NPZ S3 routing (existing, correct).
  - UNCHANGED: Parquet always routes to ParquetReaderS3Iterable (by design).

dlio_benchmark/reader/image_reader_s3_iterable.py  [NEW FILE]
  - NEW: Parallel-prefetch JPEG/PNG reader for S3-compatible object stores.
  - Inherits from ImageReader (which inherits from FormatReader) — reuses
    get_sample, next, read_index from parent chain.
  - Supports all 3 storage libraries with identical pattern to NPYReaderS3Iterable:
    _prefetch_s3dlio, _prefetch_s3torchconnector, _prefetch_minio.
  - __init__: early fail for s3torchconnector (ImportError with pip hint).
  - open(): returns pre-fetched decoded numpy array from cache.
  - Uses PIL.Image.open() + np.asarray() for decode (to be removed in follow-up
    refactoring; only image.nbytes is used for telemetry, not the decoded data).

dlio_benchmark/storage/obj_store_lib.py
  - IMPROVED: ObjStoreLibStorage enhanced for s3torchconnector and minio.
  - NEW: MinIOAdapter to make Minio client compatible with S3Client-like API.
  - IMPROVED: list_objects(), get_data(), put_data() all dispatch per library.

dlio_benchmark/storage/storage_factory.py
  - ADDED: ObjStoreLibStorage path for S3 + PyTorch framework combination.
  - ADDED: AIStore support via AIStoreStorage (guarded import, fails with clear
    error if aistore package not installed).
  - DEBUG: Temporary debug prints left in for storage routing visibility.

dlio_benchmark/utils/config.py
  - IMPROVED: storage_options propagated through ConfigArguments.
  - IMPROVED: storage_library field parsing from YAML.

dlio_benchmark/utils/utility.py
  - IMPROVED: gen_random_tensor() uses dgen-py when available for 30-50x speedup.

dlio_benchmark/data_generator/npz_generator.py
  - FIXED: minor generator compatibility improvement.

dlio_benchmark/reader/npy_reader_s3.py
dlio_benchmark/reader/npz_reader_s3.py
  - FIXED: minor compatibility fixes (vestigial sequential readers).

README.md
  - MAJOR: Added comprehensive S3/object storage support documentation:
    overview, per-library install instructions, configuration YAML examples,
    run commands for all 3 libraries, timing correctness note.

docs/DLIO-Object-Storage_Analysis.md  [NEW FILE]
  - NEW: Analysis of DLIO timing loop behavior with object storage.
    Documents that measurement semantics are unchanged; S3 I/O occurs inside
    DataLoader worker prefetch, which is correctly inside the timed region.

KNOWN ISSUES / FOLLOW-UP (tracked for next commit)
===================================================
  - Code replication: NPZ/NPY/Image S3 iterable readers share ~150 lines of
    identical prefetch logic (uri_for_obj_key, _prefetch_s3dlio/s3torch/minio,
    _prefetch dispatch, next, read_index). Refactoring to _S3IterableMixin
    planned as immediate follow-up.
  - numpy/PIL decode overhead: all three readers decode raw bytes to numpy arrays
    (np.load, PIL.Image.open + np.asarray) solely to get image.nbytes for
    telemetry. The actual decoded data is NEVER used — FormatReader.next() always
    yields self._args.resized_image (pre-allocated random tensor). Replacing
    decode with len(raw_bytes) eliminates unnecessary CPU work.
  - Parquet factory: no storage_type guard; configuring parquet + local FS
    silently constructs ParquetReaderS3Iterable which fails confusingly.
  - storage_factory.py: debug print() statements to be removed.
  - Old sequential readers (npy_reader_s3, npz_reader_s3): vestigial, factory
    no longer routes to them for recognized storage_library values.

* refactor: collapse S3 iterable readers into thin subclasses via _S3IterableMixin

MOTIVATION
----------
Three reader files (NPZ/NPY/Image) each contained ~250-307 lines of duplicated
prefetch logic: per-library dispatch (_prefetch_s3dlio / _prefetch_s3torchconnector
/ _prefetch_minio), Minio client construction, endpoint env setup, and s3torchconnector
import validation. A design review also revealed that all numpy/PIL decode (np.load,
PIL.Image.open, np.asarray) inside those prefetch methods was pure CPU overhead whose
result was NEVER used — FormatReader.next() always yields self._args.resized_image,
a pre-allocated random tensor from config.py, not the actual decoded file data.

CHANGES
-------
_s3_iterable_mixin.py (new, 328 lines)
  Shared mixin for all three S3 iterable readers. Contains:
  - _s3_init(opts): library validation at construction, sets _storage_library /
    _opts / _object_cache / _minio_client; validates s3torchconnector is importable
    immediately (not lazily), so misconfiguration fails fast.
  - _uri_for_obj_key(): s3://bucket/key URI construction.
  - _get_minio_client(): lazy, cached urllib3.PoolManager + Minio SDK client;
    reused across epochs to avoid rebuilding TCP connection pools per epoch.
  - _prefetch_s3dlio(obj_keys) -> {key: len(data)}: parallel get via
    s3dlio.get_many(); stores ONLY the raw byte count, no numpy decode.
  - _prefetch_s3torchconnector(obj_keys) -> {key: len}: sequential streaming GET
    per object via S3IterableDataset.from_objects(); drains the reader with read()
    for byte count, no numpy decode.
  - _prefetch_minio(obj_keys) -> {key: len}: ThreadPoolExecutor + Minio.get_object;
    stores ONLY the raw byte count, no numpy decode.
  - _prefetch(obj_keys): dispatches to the above three (strict, no fallback).
  - _s3_prefetch_all(): collects deduplicated obj_keys for the current thread's
    file_map slice, calls _prefetch(), populates _object_cache.
  - _s3_ensure_cached(filename): on-demand fetch if filename not in cache.

npz_reader_s3_iterable.py (307 lines → 74 lines)
  Thin subclass of NPZReader + _S3IterableMixin. Overrides:
  - open(filename): returns self._object_cache.get(filename) (int or None).
  - close(filename): evicts from cache.
  - get_sample(filename, sample_index): calls dlp.update(image_size=...) with
    cached byte count. Does NOT call super() — NPZReader.get_sample() would do
    open_file_map[filename][..., idx] which fails on an int.
  - next(): calls _s3_prefetch_all() then delegates to super().next().
  - read_index(): calls _s3_ensure_cached() then delegates to super().

npy_reader_s3_iterable.py (254 lines → 107 lines)
  Thin subclass of NPYReader + _S3IterableMixin. Same override pattern as NPZ.
  get_sample() does NOT call super() for the same reason (NPYReader.get_sample()
  also indexes open_file_map[filename][..., sample_index]).

image_reader_s3_iterable.py (259 lines → 110 lines)
  Thin subclass of ImageReader + _S3IterableMixin. Same override pattern.
  get_sample() additionally calls dft_ai.update(image_size=byte_count) to
  replicate the second metric update that ImageReader.get_sample() would have
  performed. Does NOT call super().get_sample() (ImageReader calls .nbytes on
  the cached value which is an int, not an ndarray).

PERFORMANCE IMPACT
------------------
Before: every prefetched object was decoded (np.load / PIL.Image.open / np.asarray),
consuming significant CPU time and memory for data that was immediately discarded.
After: only len(raw_bytes) is stored. No numpy or PIL imports in any thin subclass.
Minio client pooling across epochs reduces TCP setup overhead for all three formats.

LINE COUNT SUMMARY
------------------
  Before: ~820 lines across 3 files (+ no mixin)
  After:  ~291 lines across 3 files + 328-line mixin = 619 total
  Net:    -201 lines of code, -0 lines of unique logic (all logic is in mixin)

* fix: make all 3 storage libraries consistent in _s3_iterable_mixin and thin subclasses

- _s3_init(): minio now validates its import EAGERLY at construction time,
  matching s3dlio (env setup) and s3torchconnector (import check). All three
  libraries now fail-fast at __init__ instead of deferring minio's failure
  to the first I/O call.
- image_reader_s3_iterable.py: fix stale module docstring that still said
  'decodes them with Pillow into a numpy uint8 array' — PIL decode was
  eliminated in the mixin refactor.
- image_reader_s3_iterable.py: remove unused 'import io' and 'import os'.
- npy_reader_s3_iterable.py: remove unused 'import os'.

* feat: add ParquetReader for local/network filesystems; fix factory routing

ParquetReader (parquet_reader.py, new)
  Filesystem counterpart to ParquetReaderS3Iterable. Uses pyarrow natively
  (no object-storage adapters) with identical logic:
  - open(): reads parquet footer, builds cumulative row-group offset list
  - get_sample(): bisect maps sample_index → row_group, LRU cache bounds
    memory, reports compressed_bytes to dlp profiler
  - close(): evicts row-group cache entries for the file
  - Same options as S3 variant: columns, row_group_cache_size under storage_options
  - finalize(): clears entire row-group cache

reader_factory.py (fix)
  FormatType.PARQUET was unconditionally routing ALL storage types to
  ParquetReaderS3Iterable — local filesystem parquet workloads would crash
  because _S3RangeFile tries to call s3dlio.stat() on a local path.
  Fixed to match the NPY/NPZ/JPEG pattern:
    S3 / AIStore  → ParquetReaderS3Iterable (existing)
    local / lustre / etc. → ParquetReader (new)

* refactor: AIStore cleanup — remove stale restrictions, debug prints, and unify bucket property

- config.py: Remove stale AIStore+PyTorch format restriction (NPZ/NPY only);
  AIStore reader_factory already routes JPEG/PNG/Parquet to the S3-iterable
  readers, so the check was blocking valid workloads.
- config.py: Remove legacy NPYReaderS3/NPZReaderS3 import validation block;
  those are the old non-mixin readers, not the path AIStore actually takes.
- config.py: Remove [DEBUG LoadConfig] print block (ENTRY + EXIT summaries).
- storage_factory.py: Remove all [DEBUG StorageFactory] print statements.
- aistore_storage.py: Replace per-method 'if not self.bucket:' guards with a
  lazy @property that initialises self._bucket on first access; all methods
  now simply reference self.bucket and get the cached handle automatically.
- aistore_storage.py: Fix isfile() which was missing the bucket guard entirely,
  causing AttributeError if called before any other storage operation.

* chore: remove orphaned s3_storage_dpsi.py

The file defined its own 'class S3Storage(DataStorage)' — identical name to
s3_storage.py — creating a latent class-name collision if ever imported.
It was added in commit 14561b8 as a work-in-progress prototype and was
immediately superseded by ObjStoreLibStorage in obj_store_lib.py.
Zero callers exist anywhere in the codebase (confirmed via grep).

* chore: comment out DEBUG print statements in ObjStoreLibStorage

All [DEBUG ...] prints in __init__, put_data, and get_data are now
commented out rather than deleted, so they can be re-enabled easily
during local debugging. The one active print (error in list_objects)
is a real error message and is left in place.

* storage: convert commented debug prints to logging.debug() in obj_store_lib

Replaces the 20 commented-out # print(f'[DEBUG ...]') lines with proper
logging.debug() calls, keyed off the existing DLIO_LOG_LEVEL infrastructure.

Benefits over # print:
- Zero-cost when DLIO_LOG_LEVEL != debug (short-circuit before formatting)
- Appears automatically in the log file with timestamps
- Includes file path + line number in debug mode
- Works for users without source access (no code changes needed to enable)

The credentials block uses an isEnabledFor(DEBUG) guard so the src_key/
src_sec/src_ep intermediate vars are only computed when debug is active.

Enable with: DLIO_LOG_LEVEL=debug dlio_run ...

* feat: multi-library object-store checkpointing (s3dlio / minio / s3torchconnector)

Add full multi-library checkpoint support with the following changes:

pytorch_obj_store_checkpointing.py:
- Unified checkpoint writer/reader for s3dlio, minio, and s3torchconnector via
  storage_library key in the workload YAML
- s3dlio multipart tuning: env-var overrides S3DLIO_MULTIPART_PART_SIZE_MB /
  S3DLIO_MULTIPART_MAX_IN_FLIGHT; production defaults restored to 128 MB x 8
- Documents v0.9.82 regression (blocking semaphore) and GitHub issue #134
  in a large comment block above the s3dlio kwargs section

pytorch_s3_checkpointing.py:
- Deleted: functionality superseded by pytorch_obj_store_checkpointing.py

config.py / enumerations.py:
- Recognise storage_library: s3dlio | minio | s3torchconnector from workload YAML
- Inject value into storage_options so PyTorchObjStoreCheckpointing can read it
- Set correct checkpoint_mechanism and reader_classname per library
- Fail fast with clear error if the selected library package is not installed

obj_store_lib.py:
- Instantiate the correct S3 client based on storage_library selection
- s3dlio: PyObjectStoreClient; minio: Minio SDK; s3torch: S3Client

_s3_iterable_mixin.py / parquet_reader_s3_iterable.py:
- S3 reader cleanups for multi-library correctness

tests/dlio_s3_benchmark_test.py:
- Update tests to cover multi-library checkpoint paths

docs/AIStore_Analysis.md:
- New analysis document

* feat: full object storage support for all formats — generators, readers, and framework layer

All 8 DLIO benchmark formats (npy, npz, hdf5, parquet, csv, jpeg, png, tfrecord)
now work correctly end-to-end against object storage (S3/MinIO/GCS/Azure) via the
s3dlio storage library. This required fixes spanning data generators, readers,
the TensorFlow framework layer, storage factory, config handling, and a 10-bug
root-cause analysis.

## Data generators (data_generator/)

### Base class (data_generator.py)
- Added _generate_files(write_fn) template method — eliminates ~15-line loop
  boilerplate duplicated across all 10 generators
- Added _file_seed(i) helper: per-file deterministic seed = BASE_SEED + file_index
- Added _extract_dims(i) helper for consistent dimension extraction
- Migrated all 10 generators to use _generate_files() template

### Bug: np.random.seed(10) — all MPI ranks produce identical data
All generators called np.random.seed(10) unconditionally before their write loop.
With MPI, every rank wrote the same data to different files, making distributed
generation produce duplicate datasets. Fixed with rank-unique per-file seeding via
_file_seed(i).

### Bug: NPZ generator passed BytesIO object instead of bytes
npz_generator.py called storage.put_data(path, output) where output was a BytesIO
object. Fixed to output.getvalue() to pass actual bytes.

### Added object storage support to 6 generators that had none:
- hdf5_generator.py: uses h5py core driver with BytesIO backing
- csv_generator.py: uses io.StringIO → encode → put_data
- tf_generator.py: uses BytesIO + TFRecord framing
- indexed_binary_generator.py: uses BytesIO; also replaced legacy np.random global
  API with gen_random_tensor() (dgen-py, ~155x faster)
- synthetic_generator.py: uses BytesIO
- parquet_generator.py: uses pyarrow.BufferOutputStream; also replaced legacy
  np.random global API with gen_random_tensor()

## Read path (reader/)

### Bug: reader_factory.py routed CSV/HDF5/TFRECORD to wrong readers for s3dlio
When storage_library=s3dlio, CSV/HDF5/TFRECORD formats were routed to local-file
readers that called open() on S3 URIs. Fixed by adding s3dlio dispatch branches
that select the new iterable readers.

### Three new S3 iterable readers:
- reader/csv_reader_s3_iterable.py: parallel-prefetch CSV reader using
  s3dlio.get_object() with ThreadPoolExecutor prefetch
- reader/hdf5_reader_s3_iterable.py: parallel-prefetch HDF5 reader using h5py
  core driver over BytesIO from s3dlio
- reader/tfrecord_reader_s3_iterable.py: parallel-prefetch TFRecord reader; no
  protobuf decode (raw tensor extraction); fixed KeyError: -1 when thread_index=-1
  by explicitly collecting all file_map values in that case

## TensorFlow framework layer (framework/tf_framework.py)

### Bug: all 7 storage methods used tf.io.gfile for S3 URIs
tf.io.gfile does not support s3dlio-managed endpoints or auth. All 7 methods
(read, write, delete, stat, listdir, makedirs, exists) were rewritten to dispatch
to s3dlio.* for s3://, gs://, and az:// URIs, falling back to tf.io.gfile for
local paths.

## Storage factory (storage/storage_factory.py)

### Bug: TENSORFLOW framework type got wrong storage class
FrameworkType.TENSORFLOW was not in the ObjStoreLibStorage branch, so TensorFlow
workloads got S3Storage which double-mangled S3 URIs. Added TENSORFLOW alongside
PYTORCH in the ObjStoreLibStorage dispatch.

## Config handling (utils/config.py)

### Bug: build_sample_map_iter() called os.path.abspath() on S3 URIs
os.path.abspath("s3://bucket/path") returns a local path like
/current/dir/s3:/bucket/path, breaking all sample map construction for object
storage workloads. Fixed with a StorageType.LOCAL_FS guard so abspath() is only
called for local filesystem paths.

## AIStore storage (storage/aistore_storage.py)

### Bug: import-time logging.warning() fired unconditionally
The except ImportError block emitted a logging.warning() even when aistore was
not installed and the user had no intention of using AIStoreStorage. Moved the
error to __init__() so it only fires when actually instantiated.

## Tests

### tests/test_data_generator_improvements.py (new, 24 tests)
Unit and integration tests covering:
- _file_seed() determinism and rank-uniqueness
- _generate_files() template invocation count
- NPZ BytesIO vs getvalue() correctness
- Per-format generator smoke tests (mock storage)
- MPI rank seeding uniqueness

### tests/test_s3dlio_object_store.py (new, 8 tests)
End-to-end integration tests against real MinIO (opt-in via env var):
- Full put + verify + get cycle for all 8 formats
- All 8/8 formats confirmed passing: npy, npz, hdf5, parquet, csv, jpeg, png,
  tfrecord

## Documentation

- docs/data_generator_analysis.md: implementation summary covering all bugs fixed,
  new readers added, test results, and file change inventory

* chore: remove backup file from docs

* fix: add dgen-py>=0.2.0 as required dependency

gen_random_tensor() raises RuntimeError when dgen-py is not installed
(used by IndexedBinaryGenerator and ParquetGenerator). Add to core_deps
in setup.py and requirements.txt so CI installs it automatically.

* fix: make dgen-py a soft dependency with numpy fallback

When dgen-py is not installed (e.g. Python 3.9 which dgen-py does not
support), gen_random_tensor() now logs a warning and falls back to numpy
instead of raising RuntimeError. dgen-py remains the fast default on
Python>=3.10 where it is available.

Also restrict the pip install marker to python_version>='3.10' in both
setup.py and requirements.txt.

* ci: drop Python 3.9 and 3.10 from matrix, require 3.11+

* ci: drop Python 3.9/3.10, add dgen-py to requirements-test.txt

* fix: soften dgen-py hard fail in config.py, fix CI cache key to include requirements-test.txt

* fix: reduce parallelism in failing tests due to over subscription, from np=4 to np=1

* Stabilize DFTracer CI and checkpoint tests on Python 3.12

Summary of changes:

- Limit CI Python matrix to 3.12 for deterministic DFTracer/runtime behavior.

- Restore AI logging test parallelism to match intended CI behavior: pytest -n 4.

- Add fail-fast CI preflight step that hard-fails on missing runtime imports:

  dftracer.python, dftracer.dftracer, dgen_py.

- Add pytest-timeout to setup.py test extras and requirements-test.txt so timeout markers are enforced.

- Fix pytest config table name in pyproject.toml: [tool.pytest.ini_options].

- Fix TensorFlow checkpoint path NameError by importing numpy and gen_random_tensor in tf_checkpointing.py.

- Remove hard mlpstorage dependency from DLIO packaging to avoid circular dependency.

- Add internal fallback backend (simple_streaming_checkpointing.py) for file/direct checkpoint save/load when mlpstorage is unavailable.

- Keep object-store checkpointing explicit: raise clear ImportError if mlpstorage is missing when object-store checkpoint backend is selected.

Local validation before commit:

- Python 3.12 env created and requirements installed.

- Targeted checkpoint smoke tests passed (PT/TF train-with-checkpoint + checkpoint-only): 4 passed.

- Full checkpoint subset passed with DFTracer enabled:

  pytest -n 1 -v tests/dlio_ai_logging_test.py -k 'train_with_checkpoint or checkpoint_only'

  Result: 24 passed.

- Full CI-equivalent AI logging command passed with DFTracer enabled:

  pytest tests/dlio_ai_logging_test.py -n 4 -v

  Result: 61 passed.

* CI: align dgen-py usage with 0.2.2 and Python 3.11+

- keep dftracer imports required but treat dgen_py preflight as optional warning

- bump dgen-py minimum to 0.2.2 in setup and requirements

- align dgen-py marker to python_version >= 3.11

* Tests: fix output glob path in checkpoint benchmark verification

Use os.path.join for *_output.json lookup so checkpoint tests detect rank output files correctly.

* Tests: harden output-path checks and checkpoint count arithmetic

- replace malformed output globs with os.path.join in benchmark helper tests

- use integer arithmetic for expected checkpoint file counts

* Tests: suppress Python 3.12 multiprocessing fork deprecation noise

Add targeted pytest filter for multiprocessing.popen_fork warning in multi-threaded MPI test processes.

* Tests: keep fork warnings visible and fix mocked S3 storage library config

- revert pytest warning suppression for multiprocessing fork deprecations

- set storage_options.storage_library=s3torchconnector in dlio_s3_benchmark_test fixture

* Tests: make object-storage coverage minimal by default

- keep small S3/AIStore smoke tests on by default

- gate heavy object-storage matrices behind DLIO_OBJECT_STORAGE_EXTENDED=1

- ensure S3 mocks cover s3torchconnector client path

- keep live s3dlio integration opt-in and reduce default format breadth

* Tests: gate object-storage suites behind opt-in flag

- disable S3 and AIStore mock tests unless DLIO_OBJECT_STORAGE_TESTS=1

- require DLIO_OBJECT_STORAGE_TESTS alongside DLIO_S3_INTEGRATION for live s3dlio tests

* Skip object storage tests cleanly

* Gate object storage CI steps

* docs: add I/O issues analysis and executive summary (2026-03-28)

Add comprehensive code review of DLIO benchmark I/O design issues covering:
- MPI sharding correctness (TFRecord iterative sampler bug)
- File vs. object storage reader asymmetry (correctness issue)
- JPEG/PNG generator overhead and DALI path analysis
- YAML config proliferation and proposed Hydra architecture
- read_threads sizing, MPI topology integration
- File/object rationalization proposal (Section 13)

Also add executive summary version targeting decision-makers,
with no code examples and links back to the full document.

* ci: remove S3TorchConnector tests from CI workflow

* fix: minio connection pool, s3torchconnector bool, obj_store fixes

- obj_store_lib.py: handle Python bool for s3_force_path_style (was
  crashing with s3torchconnector which passes actual booleans from YAML)
- obj_store_lib.py: urllib3 PoolManager maxsize=10 (fixes 'Connection
  pool is full, discarding connection' warnings with minio)
- _s3_iterable_mixin.py, parquet_reader_s3_iterable.py: reader fixes
- pytorch_checkpointing.py, pytorch_obj_store_checkpointing.py: fixes
- utility.py: storage utility improvements
- __init__.py: version/init updates

* Add parquet configuration options to ConfigArguments and LoadConfig (#9)

* Add parquet configuration options to ConfigArguments and LoadConfig

* Make record_size a dynamic property

* Optimize Parquet generation: pre-generate full table, zero-copy slice per row-group (#10)

Reduces generation call overhead from (num_batches × num_columns) to
num_columns per file. Arrow's table.slice() is zero-copy, so peak RAM
holds one full file's column data but eliminates repeated dgen-py/numpy
calls per batch.

Based on PR#10 from mlcommons/DLIO_local_changes (not yet merged upstream).

* Add uv support: [project] table in pyproject.toml + uv.lock

- Add PEP 621 [project] table to pyproject.toml for uv compatibility
- Add readme field to fix setuptools _long_description crash
- Add optional-dependencies and entry_points for uv extras
- setup.py left untouched for backward compatibility with pip/setuptools
- Add uv.lock for reproducible uv environments
- Add docs/DLIO_PR_Plan_2026-04-10.md: 8-PR implementation plan

* chore: ignore coderag index and fastembed cache directories

.coderag/       — local code-intelligence index generated by 'coderag index .'
.fastembed_cache/ — ONNX model weights downloaded by coderag on first run

Neither directory contains source code; both are regenerated on demand and
must not be committed to the repository.

* test: Batch E — test infra hardening, disable dftracer, spawn MP, fix warnings

- Disable dftracer entirely: no import, always-active no-op stubs in utility.py;
  remove dftracer globals/calls from main.py; configure_dftracer/finalize_dftracer
  become no-ops in config.py; set_dftracer_initialize/finalize kept as no-ops
- Change default multiprocessing_context from 'fork' to 'spawn' (avoids deadlocks
  in multi-threaded test processes); remove fork guard from configure_dlio_logging
- Fix pin_memory: AND with torch.cuda.is_available() so no UserWarning on CPU hosts
- Fix NumPy empty-slice warnings: guard io_save/duration_save stats with len() > 0
  check, consistent with existing io_load guard in statscounter.py
- Object-storage tests strictly opt-in via DLIO_OBJECT_STORAGE_TESTS=1 env var
- Add DALI skip guards; make dftracer optional in pyproject.toml/setup.py
- Fix DLIOMPI singleton reset in all test finalize() methods
- Fix generate_random_shape to use seeded RNG (deterministic)
- Remove dead duplicate OmegaConf call in test_npy_reader_compatibility
- Remove dlp_logger/dftracer finalizer from TorchDataset worker

* fix(readers+gen): PR-2/3 — local-FS reader parity + JPEG/PNG fast generation

PR-2: Skip CPU decode and add parallel prefetch to local-FS readers
  S3 iterable readers pre-fetch all files in parallel (queue depth=N) before
  the iteration loop. Local-FS readers were opening and decoding files one at
  a time (queue depth=1). This structural asymmetry makes local-FS benchmarks
  artificially slow relative to their physical bandwidth.

  New _LocalFSIterableMixin (_local_fs_iterable_mixin.py):
    - _localfs_prefetch_all(): ThreadPoolExecutor parallel reads before next()
    - Stores only raw byte count per file (same pattern as _S3IterableMixin)
    - No numpy / PIL / h5py decode — those allocate and immediately discard data

  Applied to: ImageReader, NPYReader, HDF5Reader, NPZReader
    - open() returns cached byte count instead of decoded array
    - get_sample() uses byte count directly for image_size telemetry
    - next() calls _localfs_prefetch_all() before iterating

PR-3: JPEG/PNG generator raw-bytes fast path (Option A)
  PIL encode costs ~30ms/file (JPEG) or ~100-200ms/file (PNG). Since PR-2
  readers now only measure raw byte counts and never decode content, the
  encode step is pure overhead that does not affect benchmark results.

  When data_loader != native_dali: write records.tobytes() directly.
    - ~1000-4000x faster for large synthetic datasets
    - Safe: readers in PR-2 never decode content
  When data_loader == native_dali: keep full PIL encode (fn.decoders.image()
    requires a valid JPEG/PNG bitstream).

* fix(config): PR-1/4/5 — iterative sampler bug, multiprocessing_context auto-derive, read_threads auto-sizing

PR-1: build_sample_map_iter file-index reset for non-zero ranks
  The local sample_index counter was resetting file_index back to rank-0's
  partition on every iteration after the first. Fix: carry the rank offset
  (my_rank * files_per_rank) forward through all iterations so each rank
  stays in its own file partition.

PR-4: multiprocessing_context auto-derive from storage_library
  s3dlio and s3torchconnector initialize async runtimes at import time.
  fork()-based DataLoader workers inherit broken file-descriptors. Auto-set
  multiprocessing_context='spawn' when storage_library is one of these and
  the user has not overridden the default.

PR-5: read_threads auto-sizing
  Default read_threads=1 leaves I/O bandwidth on the table on modern NVMe
  and NVMe-oF systems. When read_threads==1 (the 'user did not set this'
  sentinel), auto-size to min(cpu_count // comm_size, 8) and log the choice.

* test: fix test_npy_reader_compatibility for LocalFSIterableMixin design

NPYReader.open() returns int byte count on this branch (not ndarray) —
that is correct by design: _LocalFSIterableMixin skips decode and
caches only byte counts for storage-bandwidth benchmarking parity
with _S3IterableMixin. Update assertion to match actual contract and
verify file content directly via np.load.

* test: Batch E — test infra hardening, disable dftracer, spawn MP, fix warnings

- Disable dftracer entirely: no import, always-active no-op stubs in utility.py;
  remove dftracer globals/calls from main.py; configure_dftracer/finalize_dftracer
  become no-ops in config.py; set_dftracer_initialize/finalize kept as no-ops
- Change default multiprocessing_context from 'fork' to 'spawn' (avoids deadlocks
  in multi-threaded test processes); remove fork guard from configure_dlio_logging
- Fix pin_memory: AND with torch.cuda.is_available() so no UserWarning on CPU hosts
- Fix NumPy empty-slice warnings: guard io_save/duration_save stats with len() > 0
  check, consistent with existing io_load guard in statscounter.py
- Object-storage tests strictly opt-in via DLIO_OBJECT_STORAGE_TESTS=1 env var
- Add DALI skip guards; make dftracer optional in pyproject.toml/setup.py
- Fix DLIOMPI singleton reset in all test finalize() methods
- Fix generate_random_shape to use seeded RNG (deterministic)
- Remove dead duplicate OmegaConf call in test_npy_reader_compatibility
- Remove dlp_logger/dftracer finalizer from TorchDataset worker

* test: add PR verification benchmarks and report (April 12, 2026)

- bench_generation.py: JPEG/PNG fast-path vs PIL encode speedup
- bench_readers.py: reader parity baseline benchmark
- bench_readers2.py: decode cost isolation + parallel prefetch analysis
- bench_config_fixes.py: 16-case behavioral verification of config.py fixes
  (iterative sampler bug, multiprocessing_context auto-derive, read_threads auto-size)
- dlio_fix_verification_report.md: full results report for PR submissions

All scripts run via: uv run python tests/PRs-12-Apr-26/<script>.py

* fix: parallel generation, storage env vars, MPI topology, settle guard (Issues 9,10,11,12,13,6b)

PR-13 (Issue 12): Add ranks_per_node() to DLIOMPI; use it for read/write thread
auto-sizing instead of total comm_size so multi-node configs divide CPUs by
ranks-per-node, not total world size.

PR-14 (Issues 10+11+6b): Parallel data generation via ThreadPoolExecutor.
- Add write_threads field to ConfigArguments (default 1, auto-sized like read_threads)
- Rewrite _generate_files() with two-phase design: pre-derive RNG seeds sequentially
  (preserves determinism), then dispatch writes in parallel worker threads
- Each worker gets its own np.random.default_rng(seed) — no shared state
- Identical output with any write_threads value (determinism guaranteed)
- Add clarifying comment to DataGenerator.__init__ re: Issue 6b (non-bug confirm)

PR-12 (Issue 9): Storage environment-variable overrides in _apply_env_overrides().
- DLIO_STORAGE_LIBRARY → storage_options['storage_library']
- DLIO_BUCKET          → storage_root (if unset)
- DLIO_STORAGE_TYPE    → storage_type (if unset)
- AWS_ACCESS_KEY_ID    → storage_options['access_key_id']
- AWS_SECRET_ACCESS_KEY→ storage_options['secret_access_key']
- AWS_ENDPOINT_URL     → storage_options['endpoint_url']
- AWS_REGION           → storage_options['region']
- YAML/CLI values always win (env vars only fill unset fields)
- Optional dotenv dict support with env vars taking priority

PR-15 (Issue 13): Post-generation settle guard for eventual-consistency stores.
- Add post_generation_settle_seconds: float = 0.0 to ConfigArguments
- Load from config['storage']['post_generation_settle_seconds'] in LoadConfig()
- Add _apply_settle_guard(args, comm) to main.py: rank-0 sleeps then barrier
- Guard is no-op for LOCAL_FS or when settle=0 (zero behaviour change by default)
- Call after generation barrier in DLIOBenchmark.initialize()

Tests: tests/test_remaining_issues.py — 28 tests, all pass

* fix: cap auto-sized thread count in tests and CI (DLIO_MAX_AUTO_THREADS)

The parallel generation feature (PR #12) auto-sizes write_threads and
read_threads to cpu_count // ranks_per_node, which on a 64-core dev box
produced 8 threads per test.  Running the full suite with that many threads
caused resource contention between concurrent mpirun subprocesses, leading
to intermittent failures (empty stdout/stderr, return code 1).

Changes:
- config.py: replace hardcoded cap of 8 with int(os.environ.get('DLIO_MAX_AUTO_THREADS', '8'))
  so the ceiling is overridable without code changes
- tests/conftest.py: set DLIO_MAX_AUTO_THREADS=2 via os.environ.setdefault so
  all tests (local and CI) run with a known bounded thread count
- .github/workflows/ci.yml: add DLIO_MAX_AUTO_THREADS: '2' to the job env
  block so GitHub Actions runners (2 vCPUs) are not over-committed

No behaviour change for production runs (default cap remains 8).

* chore: remove uv.lock, internal docs, and bench scripts from upstream PR

* chore: lock Python to 3.12 only (>=3.12,<3.13)

* fix: add pydftracer to setup.py test deps for Preflight C extension check

* fix: restore pydftracer to core_deps (was removed in earlier commit, breaks via-setup Preflight)

* fix: add pydftracer to pyproject.toml dependencies (pip ignores setup.py install_requires when [project] table exists)

* fix: add dftracer>=2.0.1 to pyproject.toml test extras (provides C extension dftracer.dftracer required by Preflight)

* fix: add pyproject.toml to CI cache key to prevent stale venv reuse

* fix: drop incomplete last batch in FormatReader.next() to match drop_last=True semantics

PyTorch DataLoader uses drop_last=True which discards the final incomplete
batch. The TF path goes through FormatReader.next() which was padding the last
batch to batch_size — causing extra fetch_iter events and failing assertions:

  expected_iters = num_epochs * (num_data_pp // batch_size)  # floor division

Removing the padding means an incomplete last batch is silently dropped (the
yield only fires when len(batch) == batch_size). This matches PyTorch behavior
and fixes test_ai_logging_train[tensorflow-*] failures.

* fix: pin read_threads=1 in test_ai_logging_train to avoid per-thread batch count mismatch

Auto-threading (PR-5/PR-13) can set read_threads>1 in CI. With multiple TF
interleave threads, samples are partitioned per-thread, so each thread yields
floor(samples_per_thread/batch_size) batches — the sum is less than
floor(total_samples/batch_size), breaking the assertion:

  expected_iters = num_epochs * (num_data_pp // batch_size)

e.g. 10 samples, bs=2, read_threads=2:
  expected = 10//2 = 5 but actual = 2*(floor(5/2)) = 4

The test was written assuming single-threaded iteration. Pin read_threads=1
to match that assumption. test_ai_logging_train_with_step is unaffected as
it already parametrizes read_threads explicitly.

* fix: restore dftracer loading — conditional import from DFTRACER_ENABLE env var

The d9f175b commit hardcoded DFTRACER_ENABLE=False and replaced all dftracer
code with permanent no-op stubs. This broke dlio_ai_logging_test.py entirely,
since those tests require actual .pfw trace files to be written.

- utility.py: restore conditional dftracer.python import; fall back to no-op
  stubs only when the library is absent or raises ImportError
- config.py: restore configure_dftracer() to call PerfTrace.initialize_log()
  when DFTRACER_ENABLE is True; restore finalize_dftracer() to flush
- main.py: restore dftracer_initialize/finalize/dftracer globals, the call
  to configure_dftracer() in __init__(), and the finalize_dftracer() call
  in finalize(); restore set_dftracer_initialize/finalize to update globals

CI sets DFTRACER_ENABLE=1, which now causes the real dftracer C extension to
initialize and write .pfw trace files, allowing test_ai_logging to pass.

* fix: add missing global declarations for dftracer in __init__ and finalize()

Without 'global dftracer' in __init__, the assignment:
    dftracer = self.args.configure_dftracer(...)
created a local variable, leaving the module-level dftracer=None.
finalize() then saw None and never called finalize_dftracer().

TensorFlow calls os._exit() during shutdown, bypassing Python atexit
handlers — so without explicit finalize_dftracer(), the .pfw trace file
is never flushed. PyTorch exits cleanly so dftracer's atexit fires as a
fallback, which is why pytorch tests passed but tensorflow tests did not.

* ci: remove test_ai_logging step — dftracer integration tests not needed

* ci: replace push/PR trigger with fast CI suite; keep integration tests as manual

- Rename ci.yml → integration.yml; change trigger to workflow_dispatch only
  so the full 21-suite run is manual-only (GitHub Actions UI).
- Add .github/workflows/fast-ci.yml: 3-leg matrix (via-uv, via-setup,
  via-reqs) that runs on every push and PR, completes in < 10 minutes.
- Add tests/test_fast_ci.py: 65-test fast CI suite covering preflight
  imports, enumerations, utilities, config defaults/derive, generator and
  reader factories, NPY/NPZ/HDF5/image I/O, MPI smoke (np=2), and an
  end-to-end generate+train pipeline.

* ci: add pyarrow to requirements-test.txt for via-reqs leg

pyarrow is a core dependency in pyproject.toml but was missing from
requirements-test.txt, causing test_pyarrow to fail in the via-reqs
matrix leg.

---------

Signed-off-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com>
Signed-off-by: Denis Barakhtanov <dbarahtanov@enakta.com>
Co-authored-by: Darien Imai <941951+dpsi@users.noreply.github.com>
Co-authored-by: Izzet Yildirim <yildirim2@llnl.gov>
Co-authored-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com>
Co-authored-by: enakta <140368024+enakta@users.noreply.github.com>
Co-authored-by: Denis Barakhtanov <denis.barahtanov@gmail.com>
Co-authored-by: Wolfgang De Salvador <118554802+wolfgang-desalvador@users.noreply.github.com>
@russfellows
Copy link
Copy Markdown

russfellows commented Apr 16, 2026

Merge Conflicts - Now re-implementing.

This IS a good change, except now there are merge conflicts. Thank you for the work Wolfgang, here is an analysis of the improvements using dgen-py streaming data generation.

The new dgen-py streaming method has linear memory usage, maxing out at 64 MB vs. old method required entire Parquet object to reside in memory, potentially using GB's. Also, new streaming method is about 2 - 3x faster.

However, this step is no longer the bottleneck. The problem has now moved to being the slow, PyArrow memory encoding, which will require another set of changes. I will update this PR branch to accommodate the new streaming method and list Wolfgang as a co-author.

Probably easiest to just close this PR.

dgen-py 0.2.3 Data Generation Benchmark Results

Platform: 12 logical CPUs
dgen-py version: 0.2.3
Benchmark: tests/bench_dgen.py
Timing discipline: Generator/Pool creation is excluded from timing; only the fill call(s) are measured.


Method Summary

BEFORE AFTER (< 32 MB) AFTER (≥ 32 MB)
API Generator(size=N).get_chunk(N) BufferPool().next_slice(N) Generator(size=N) + fill_chunk(64 MB loop)
Peak RAM Full file size Full batch size 64 MB constant
Thread pool New Rayon pool every call None Reused across all chunks
Data source Full allocation + fill Zero-copy slice from rolling pool Sequential 64 MB chunks

Results

  [1 MB]
    BEFORE  get_chunk:       0.001s 0.001s 0.001s  avg     972 MB/s   peak RAM ~1 MB
    AFTER   pool.next_slice: 0.000s 0.000s 0.000s  avg  457.10 GB/s   peak RAM ~1 MB
    Speedup: 470x   RAM savings: 1x

  [10 MB]
    BEFORE  get_chunk:       0.002s 0.002s 0.001s  avg    5.42 GB/s   peak RAM ~10 MB
    AFTER   pool.next_slice: 0.002s 0.002s 0.002s  avg    5.94 GB/s   peak RAM ~10 MB
    Speedup: 1.1x   RAM savings: 1x

  [100 MB]
    BEFORE  get_chunk:       0.009s 0.009s 0.010s  avg   11.23 GB/s   peak RAM ~100 MB
    AFTER   fill_chunk(64MB): 0.004s 0.003s 0.003s  avg   34.03 GB/s   peak RAM ~64 MB
    Speedup: 3.0x   RAM savings: 1x

  [1 GB]
    BEFORE  get_chunk:       0.065s 0.068s 0.063s  avg   16.43 GB/s   peak RAM ~1 GB
    AFTER   fill_chunk(64MB): 0.028s 0.027s 0.027s  avg   39.28 GB/s   peak RAM ~64 MB
    Speedup: 2.4x   RAM savings: 16x

  [10 GB]
    BEFORE  get_chunk:       0.574s 0.593s 0.624s  avg   17.99 GB/s   peak RAM ~10 GB
    AFTER   fill_chunk(64MB): 0.265s 0.265s 0.268s  avg   40.38 GB/s   peak RAM ~64 MB
    Speedup: 2.2x   RAM savings: 160x

Analysis

Throughput

For files ≥ 100 MB — the typical case for AI/ML training data — the new path delivers 2–3x higher throughput:

File size BEFORE AFTER Gain
100 MB 11 GB/s 34 GB/s 3.0x
1 GB 16 GB/s 39 GB/s 2.4x
10 GB 18 GB/s 40 GB/s 2.2x

The BEFORE path plateaus at ~18 GB/s because a new Rayon thread pool is created on every get_chunk() call. Startup overhead dominates at small sizes and caps throughput at large ones.

The AFTER path stabilizes at ~40 GB/s and has not hit its ceiling even at 10 GB — this is the machine's memory bandwidth limit, not a dgen-py limit.

Memory

The RAM reduction is the more important improvement for large files:

File size BEFORE AFTER Reduction
1 GB 1 GB 64 MB 16x
10 GB 10 GB 64 MB 160x

With the old API, generating a single 10 GB file required 10+ GB of RAM to be live simultaneously. This made large-file generation impractical on memory-constrained nodes and completely impossible if multiple MPI ranks shared a node.

With the new API, peak RAM is always 64 MB, regardless of file size. An 8-rank node generating 10 GB files previously required 80+ GB of RAM. Now it requires ~512 MB total.

Small file note (1 MB)

The reported 470x speedup at 1 MB is an artifact of Rayon thread pool startup latency (~1 ms), which dominates the measurement for sub-2ms operations. Both paths produce the same amount of data; the BEFORE path simply pays a fixed pool-creation cost that is disproportionate at this scale. At typical training file sizes (100 MB–10 GB), the real speedup is 2–3x.

10 MB boundary behavior

At 10 MB, both methods measure ~5–6 GB/s with near-identical times. This size sits just below the 32 MB BufferPool threshold, so the AFTER path uses next_slice(), which is fast but similarly bounded. No meaningful difference at this scale — both are well below the memory bandwidth ceiling.


Key Takeaways

  1. Generation is no longer the bottleneck. At 40 GB/s, data generation is 20–100x faster than any disk or object storage system. The bottleneck has fully shifted to Parquet encoding and I/O.

  2. Constant 64 MB peak RAM enables very large files. A node with 256 GB RAM can generate arbitrarily large files with negligible memory pressure.

  3. MPI scaling is unaffected. Each rank still generates its file slice independently. The RAM savings multiply by rank count: 8 ranks on one node previously needed 8× full-file RAM; now they need 8 × 64 MB = 512 MB total.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants