Skip to content

feat: add state materialization across regions#4490

Open
aglinxinyuan wants to merge 22 commits intomainfrom
xinyuan-state-materialization
Open

feat: add state materialization across regions#4490
aglinxinyuan wants to merge 22 commits intomainfrom
xinyuan-state-materialization

Conversation

@aglinxinyuan
Copy link
Copy Markdown
Contributor

@aglinxinyuan aglinxinyuan commented Apr 24, 2026

What changes were proposed in this PR?

This PR adds state materialization as a general mechanism for passing state across different regions.

  • materialize state as a separate storage object alongside result storage
  • store one serialized state per row
  • use the same cross-language format on Python and Scala/Java sides
  • let downstream regions read back both data and state when needed

Any related issues, documentation, discussions?

Closes #4489

How was this PR tested?

  • added a Python round-trip test for materialized state storage in test_iceberg_document.py
  • added a Scala round-trip test for materialized state storage in IcebergDocumentSpec.scala

Was this PR authored or co-authored using generative AI tooling?

Generated-by: ChatGPT (Codex), Claude Code (Claude Opus 4.7)

@aglinxinyuan aglinxinyuan self-assigned this Apr 24, 2026
@aglinxinyuan aglinxinyuan changed the title feat: add state materialization across regions wip: feat: add state materialization across regions Apr 24, 2026
@aglinxinyuan aglinxinyuan changed the base branch from main to xinyuan-state-only April 24, 2026 05:07
@aglinxinyuan aglinxinyuan marked this pull request as ready for review April 24, 2026 05:08
@aglinxinyuan aglinxinyuan changed the title wip: feat: add state materialization across regions feat: add state materialization across regions Apr 24, 2026
@aglinxinyuan aglinxinyuan reopened this Apr 28, 2026
Base automatically changed from xinyuan-state-only to main May 1, 2026 06:10
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 2, 2026

Codecov Report

❌ Patch coverage is 82.10526% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 42.70%. Comparing base (b9bbf0d) to head (3f586de).

Files with missing lines Patch % Lines
...thon/core/architecture/packaging/output_manager.py 63.63% 8 Missing ⚠️
...anagers/InputPortMaterializationReaderThread.scala 50.00% 3 Missing and 1 partial ⚠️
...ne/architecture/messaginglayer/OutputManager.scala 83.33% 2 Missing ⚠️
...he/texera/amber/core/storage/DocumentFactory.scala 0.00% 0 Missing and 2 partials ⚠️
...chitecture/pythonworker/PythonWorkflowWorker.scala 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #4490      +/-   ##
============================================
- Coverage     42.71%   42.70%   -0.01%     
- Complexity     2185     2187       +2     
============================================
  Files          1032     1006      -26     
  Lines         38161    37524     -637     
  Branches       4006     3923      -83     
============================================
- Hits          16301    16026     -275     
+ Misses        20840    20524     -316     
+ Partials       1020      974      -46     
Flag Coverage Δ *Carryforward flag
access-control-service 39.53% <ø> (ø)
agent-service 33.72% <ø> (ø) Carriedforward from d3bb766
amber 43.23% <70.96%> (+0.04%) ⬆️
computing-unit-managing-service 0.00% <ø> (ø)
config-service 0.00% <ø> (ø)
file-service 32.18% <ø> (ø)
frontend 32.26% <ø> (-0.83%) ⬇️ Carriedforward from d3bb766
python 89.22% <87.50%> (+0.32%) ⬆️
workflow-compiling-service 47.72% <ø> (ø)

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

aglinxinyuan added a commit that referenced this pull request May 3, 2026
Adds 9 unit tests targeting the codecov-flagged gaps in PR #4490:

- InputPortMaterializationReaderRunnable.run() inner state-reading
  try-block, including the missing-state-document path (ValueError
  swallow).
- DocumentFactory.create_document / open_document namespace routing
  for STATE and RESULT, plus the unsupported-resource-type and
  missing-table error paths. Iceberg dependencies are mocked at the
  document_factory import site so the tests run without Postgres.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 5, 2026 00:40
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces state materialization as a first-class storage artifact alongside result materialization so that operator state can be persisted and later replayed when execution crosses region boundaries, using a shared serialization format across Scala/Java and Python.

Changes:

  • Add a new VFS resource type (state) and route it to a dedicated Iceberg namespace/config on both Scala and Python sides.
  • Persist state rows (serialized) into a separate Iceberg table per operator output, and read them back during input-port materialization.
  • Add round-trip tests for state materialization in both Scala and Python.

Reviewed changes

Copilot reviewed 24 out of 24 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala Adds Scala tests for round-tripping materialized state rows via Iceberg.
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala Introduces STATE as a new VFS resource type.
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala Routes STATE resource type to a configurable Iceberg namespace (create/open).
common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala Adds helper to derive a state URI from a result URI.
common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala Adds icebergTableStateNamespace configuration and env var constant.
common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala Adds env var constant for state namespace.
common/config/src/main/resources/storage.conf Adds storage.iceberg.table.state-namespace default + env override.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala Reads materialized state table and emits StateFrames to the input queue.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala Attempts to persist state when processing state messages.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala Creates the state table alongside the result table during scheduling/setup.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala Passes state namespace into the Python worker process args.
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala Adds state persistence to storage for output ports with materialization enabled.
amber/src/main/python/texera_run_python_worker.py Extends CLI arg parsing to accept state namespace and initialize StorageConfig accordingly.
amber/src/main/python/pytexera/storage/test_large_binary_manager.py Updates test StorageConfig initialization to include the state namespace.
amber/src/main/python/core/storage/vfs_uri_factory.py Adds STATE as a Python VFS resource type.
amber/src/main/python/core/storage/test_document_factory.py Adds Python unit tests verifying namespace routing for RESULT vs STATE.
amber/src/main/python/core/storage/storage_config.py Adds ICEBERG_TABLE_STATE_NAMESPACE and wires it into initialization.
amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py Adds Python tests for emitting state frames and state-table read behavior.
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py Reads and emits state materialization rows before tuple materialization.
amber/src/main/python/core/storage/iceberg/test_iceberg_document.py Adds Python integration-ish tests for round-tripping materialized state rows.
amber/src/main/python/core/storage/document_factory.py Routes STATE to the state namespace for create/open.
amber/src/main/python/core/models/state.py Adds helper to derive a state URI from a result URI.
amber/src/main/python/core/architecture/packaging/test_output_manager.py Adds unit tests for Python OutputManager state persistence behavior.
amber/src/main/python/core/architecture/packaging/output_manager.py Adds state persistence API and records storage URIs for later state writes.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread amber/src/main/python/core/architecture/packaging/output_manager.py
Adds a state-materialization path alongside the existing tuple-result
storage. State produced by an operator's processState is written to a
companion Iceberg table whose URI is derived from the result URI. The
input-port materialization reader replays both tuples and states into
downstream workers.

Key pieces:

- New STATE resource type and a state-namespace storage config entry
  on both Python and Scala sides; namespaces are read from
  StorageConfig instead of hardcoded strings.
- RegionExecutionCoordinator provisions a state document next to every
  result document at scheduling time, so readers and writers can rely
  on its presence without try/catch.
- One long-lived BufferedItemWriter per output port, opened at port
  setup and closed at port completion, so a single Iceberg snapshot is
  produced per port instead of one per state.
- DataProcessor.processInputState (Scala) and MainLoop.process_input_state
  (Python) persist the executor's *output* state, matching the state
  that is also emitted downstream.
- New Python and Scala unit tests covering the State JSON wire format,
  the OutputManager state-writer lifecycle, the reader's state-replay
  block, and DocumentFactory namespace routing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@aglinxinyuan aglinxinyuan force-pushed the xinyuan-state-materialization branch from 10f243f to 581d574 Compare May 5, 2026 06:01
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

aglinxinyuan and others added 2 commits May 5, 2026 19:08
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@aglinxinyuan aglinxinyuan requested a review from Copilot May 6, 2026 02:22
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 25 out of 25 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread amber/src/main/python/core/models/state.py Outdated
Copy link
Copy Markdown
Contributor

@Xiao-zhen-Liu Xiao-zhen-Liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.

Comment thread amber/src/main/python/core/models/state.py Outdated
aglinxinyuan and others added 11 commits May 5, 2026 21:38
Address PR #4490 review comment 3192875005: document why the
input-port materialization reader replays states before tuples
(downstream operators typically need their state in place before
processing the incoming tuples).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #4490 review comment 3192889029: explain why the state
loop intentionally enqueues every row to every downstream worker
while the tuple loop filters by partitioner -- state is shared
context, not per-key data.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #4490 review comment 3192916602: writing the same state
row to every output port's state table mirrors the
broadcast-to-all-workers behavior on the emit side -- state is shared
context, not per-key data, so every downstream operator needs the
full set.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…eader

Address PR #4490 review comment 3192901027: the partitioner detour
in the Python reader was a no-op (every worker is supposed to see
every state, so the broadcast-then-filter round-trip just reduced
back to the input). Emit StateFrame(State.from_tuple(row)) directly
in run(), matching the Scala reader.

Test class TestEmitStateWithFilter is dropped; the run() block test
asserts partitioner.flush_state is not called.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drop the result-as-primary asymmetry in VFSURIFactory: ports now build
a single base URI, with result and state URIs as equal first-class
derivatives via resultURI(base) / stateURI(base). Removes the
substring-replace `siblingStateURI` helper and the asymmetric
createResultURI / createStateURI pair.
@aglinxinyuan aglinxinyuan requested a review from Xiao-zhen-Liu May 7, 2026 00:58
@aglinxinyuan
Copy link
Copy Markdown
Contributor Author

All comments have been addressed. @Xiao-zhen-Liu, please review again.

Copy link
Copy Markdown
Contributor

@Xiao-zhen-Liu Xiao-zhen-Liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM now, left a few minor comments. BTW is there any way to manually test your feature on some workflow (and have you tested the latest code yourself?)?

case None =>
}

this.stateWriterThreads.remove(outputPortId).foreach { writerThread =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result-writer cleanup directly above this calls writerThread.getFailure.foreach(throw _) so an Iceberg commit failure surfaces as a FatalError to the controller (Recently done in #4683.) The state-writer cleanup uses the same OutputPortResultWriterThread but skips the getFailure check, so a state write failure here is silently swallowed and the worker still announces port completion as if state was durably written. Please mirror the failure-rethrow on the state side too.

: mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
mutable.HashMap()

private val stateWriterThreads: mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename OutputPortResultWriterThread to OutputPortStorageWriterThread since it is not just used for result now?

Comment on lines 21 to 589
@@ -568,18 +569,21 @@ class RegionExecutionCoordinator(
): Unit = {
portConfigs.foreach {
case (outputPortId, portConfig) =>
val storageUriToAdd = portConfig.storageURI
val (_, eid, _, _) = decodeURI(storageUriToAdd)
val portBaseURI = portConfig.storageURI
val resultURI = VFSURIFactory.resultURI(portBaseURI)
val stateURI = VFSURIFactory.stateURI(portBaseURI)
val schemaOptional =
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing"))
DocumentFactory.createDocument(storageUriToAdd, schema)
DocumentFactory.createDocument(resultURI, schema)
DocumentFactory.createDocument(stateURI, State.schema)
if (!isRestart) {
val (_, eid, _, _) = decodeURI(resultURI)
WorkflowExecutionsResource.insertOperatorPortResultUri(
eid = eid,
globalPortId = outputPortId,
uri = storageUriToAdd
uri = resultURI
)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The name storageURI is misleading now — it sounds like the URI you'd pass straight into DocumentFactory.openDocument, but it's actually the base that resultURI and stateURI derive from. Worth renaming the field on OutputPortConfig (and the matching parameter on OutputManager.addPort) to storageURIBase to signal it isn't the final URI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support state materialization across regions

4 participants