feat: add state materialization across regions#4490
feat: add state materialization across regions#4490aglinxinyuan wants to merge 22 commits intomainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4490 +/- ##
============================================
- Coverage 42.71% 42.70% -0.01%
- Complexity 2185 2187 +2
============================================
Files 1032 1006 -26
Lines 38161 37524 -637
Branches 4006 3923 -83
============================================
- Hits 16301 16026 -275
+ Misses 20840 20524 -316
+ Partials 1020 974 -46
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Adds 9 unit tests targeting the codecov-flagged gaps in PR #4490: - InputPortMaterializationReaderRunnable.run() inner state-reading try-block, including the missing-state-document path (ValueError swallow). - DocumentFactory.create_document / open_document namespace routing for STATE and RESULT, plus the unsupported-resource-type and missing-table error paths. Iceberg dependencies are mocked at the document_factory import site so the tests run without Postgres. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces state materialization as a first-class storage artifact alongside result materialization so that operator state can be persisted and later replayed when execution crosses region boundaries, using a shared serialization format across Scala/Java and Python.
Changes:
- Add a new VFS resource type (
state) and route it to a dedicated Iceberg namespace/config on both Scala and Python sides. - Persist state rows (serialized) into a separate Iceberg table per operator output, and read them back during input-port materialization.
- Add round-trip tests for state materialization in both Scala and Python.
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala | Adds Scala tests for round-tripping materialized state rows via Iceberg. |
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala | Introduces STATE as a new VFS resource type. |
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala | Routes STATE resource type to a configurable Iceberg namespace (create/open). |
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala | Adds helper to derive a state URI from a result URI. |
| common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala | Adds icebergTableStateNamespace configuration and env var constant. |
| common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala | Adds env var constant for state namespace. |
| common/config/src/main/resources/storage.conf | Adds storage.iceberg.table.state-namespace default + env override. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala | Reads materialized state table and emits StateFrames to the input queue. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala | Attempts to persist state when processing state messages. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala | Creates the state table alongside the result table during scheduling/setup. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala | Passes state namespace into the Python worker process args. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala | Adds state persistence to storage for output ports with materialization enabled. |
| amber/src/main/python/texera_run_python_worker.py | Extends CLI arg parsing to accept state namespace and initialize StorageConfig accordingly. |
| amber/src/main/python/pytexera/storage/test_large_binary_manager.py | Updates test StorageConfig initialization to include the state namespace. |
| amber/src/main/python/core/storage/vfs_uri_factory.py | Adds STATE as a Python VFS resource type. |
| amber/src/main/python/core/storage/test_document_factory.py | Adds Python unit tests verifying namespace routing for RESULT vs STATE. |
| amber/src/main/python/core/storage/storage_config.py | Adds ICEBERG_TABLE_STATE_NAMESPACE and wires it into initialization. |
| amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py | Adds Python tests for emitting state frames and state-table read behavior. |
| amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py | Reads and emits state materialization rows before tuple materialization. |
| amber/src/main/python/core/storage/iceberg/test_iceberg_document.py | Adds Python integration-ish tests for round-tripping materialized state rows. |
| amber/src/main/python/core/storage/document_factory.py | Routes STATE to the state namespace for create/open. |
| amber/src/main/python/core/models/state.py | Adds helper to derive a state URI from a result URI. |
| amber/src/main/python/core/architecture/packaging/test_output_manager.py | Adds unit tests for Python OutputManager state persistence behavior. |
| amber/src/main/python/core/architecture/packaging/output_manager.py | Adds state persistence API and records storage URIs for later state writes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Adds a state-materialization path alongside the existing tuple-result storage. State produced by an operator's processState is written to a companion Iceberg table whose URI is derived from the result URI. The input-port materialization reader replays both tuples and states into downstream workers. Key pieces: - New STATE resource type and a state-namespace storage config entry on both Python and Scala sides; namespaces are read from StorageConfig instead of hardcoded strings. - RegionExecutionCoordinator provisions a state document next to every result document at scheduling time, so readers and writers can rely on its presence without try/catch. - One long-lived BufferedItemWriter per output port, opened at port setup and closed at port completion, so a single Iceberg snapshot is produced per port instead of one per state. - DataProcessor.processInputState (Scala) and MainLoop.process_input_state (Python) persist the executor's *output* state, matching the state that is also emitted downstream. - New Python and Scala unit tests covering the State JSON wire format, the OutputManager state-writer lifecycle, the reader's state-replay block, and DocumentFactory namespace routing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
10f243f to
581d574
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Xiao-zhen-Liu
left a comment
There was a problem hiding this comment.
Left some comments.
Address PR #4490 review comment 3192875005: document why the input-port materialization reader replays states before tuples (downstream operators typically need their state in place before processing the incoming tuples). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #4490 review comment 3192889029: explain why the state loop intentionally enqueues every row to every downstream worker while the tuple loop filters by partitioner -- state is shared context, not per-key data. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #4490 review comment 3192916602: writing the same state row to every output port's state table mirrors the broadcast-to-all-workers behavior on the emit side -- state is shared context, not per-key data, so every downstream operator needs the full set. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…eader Address PR #4490 review comment 3192901027: the partitioner detour in the Python reader was a no-op (every worker is supposed to see every state, so the broadcast-then-filter round-trip just reduced back to the input). Emit StateFrame(State.from_tuple(row)) directly in run(), matching the Scala reader. Test class TestEmitStateWithFilter is dropped; the run() block test asserts partitioner.flush_state is not called. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drop the result-as-primary asymmetry in VFSURIFactory: ports now build a single base URI, with result and state URIs as equal first-class derivatives via resultURI(base) / stateURI(base). Removes the substring-replace `siblingStateURI` helper and the asymmetric createResultURI / createStateURI pair.
|
All comments have been addressed. @Xiao-zhen-Liu, please review again. |
Xiao-zhen-Liu
left a comment
There was a problem hiding this comment.
LGTM now, left a few minor comments. BTW is there any way to manually test your feature on some workflow (and have you tested the latest code yourself?)?
| case None => | ||
| } | ||
|
|
||
| this.stateWriterThreads.remove(outputPortId).foreach { writerThread => |
There was a problem hiding this comment.
The result-writer cleanup directly above this calls writerThread.getFailure.foreach(throw _) so an Iceberg commit failure surfaces as a FatalError to the controller (Recently done in #4683.) The state-writer cleanup uses the same OutputPortResultWriterThread but skips the getFailure check, so a state write failure here is silently swallowed and the worker still announces port completion as if state was durably written. Please mirror the failure-rethrow on the state side too.
| : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = | ||
| mutable.HashMap() | ||
|
|
||
| private val stateWriterThreads: mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = |
There was a problem hiding this comment.
Rename OutputPortResultWriterThread to OutputPortStorageWriterThread since it is not just used for result now?
| @@ -568,18 +569,21 @@ class RegionExecutionCoordinator( | |||
| ): Unit = { | |||
| portConfigs.foreach { | |||
| case (outputPortId, portConfig) => | |||
| val storageUriToAdd = portConfig.storageURI | |||
| val (_, eid, _, _) = decodeURI(storageUriToAdd) | |||
| val portBaseURI = portConfig.storageURI | |||
| val resultURI = VFSURIFactory.resultURI(portBaseURI) | |||
| val stateURI = VFSURIFactory.stateURI(portBaseURI) | |||
| val schemaOptional = | |||
| region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 | |||
| val schema = | |||
| schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) | |||
| DocumentFactory.createDocument(storageUriToAdd, schema) | |||
| DocumentFactory.createDocument(resultURI, schema) | |||
| DocumentFactory.createDocument(stateURI, State.schema) | |||
| if (!isRestart) { | |||
| val (_, eid, _, _) = decodeURI(resultURI) | |||
| WorkflowExecutionsResource.insertOperatorPortResultUri( | |||
| eid = eid, | |||
| globalPortId = outputPortId, | |||
| uri = storageUriToAdd | |||
| uri = resultURI | |||
| ) | |||
| } | |||
| } | |||
There was a problem hiding this comment.
Nit: The name storageURI is misleading now — it sounds like the URI you'd pass straight into DocumentFactory.openDocument, but it's actually the base that resultURI and stateURI derive from. Worth renaming the field on OutputPortConfig (and the matching parameter on OutputManager.addPort) to storageURIBase to signal it isn't the final URI.
What changes were proposed in this PR?
This PR adds state materialization as a general mechanism for passing state across different regions.
Any related issues, documentation, discussions?
Closes #4489
How was this PR tested?
Was this PR authored or co-authored using generative AI tooling?
Generated-by: ChatGPT (Codex), Claude Code (Claude Opus 4.7)