Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions Docs/durable-execution-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -1627,15 +1627,29 @@ public class CallbackTimeoutException : CallbackException { }
public class CallbackSubmitterException : CallbackException { }

/// <summary>
/// Thrown when an invoked function fails.
/// Base exception for chained-invoke failures. Catch <c>InvokeException</c>
/// to handle every non-success terminal state uniformly, or pattern-match the
/// concrete subclasses (<c>InvokeFailedException</c>, <c>InvokeTimedOutException</c>,
/// <c>InvokeStoppedException</c>) to react differently to specific outcomes.
/// Mirrors the Java SDK's invoke exception tree.
/// </summary>
public class InvokeException : DurableExecutionException
{
public string? FunctionName { get; }
public string? ErrorType { get; }
public string? ErrorData { get; }
public string? FunctionName { get; init; }
public string? ErrorType { get; init; }
public string? ErrorData { get; init; }
public IReadOnlyList<string>? OriginalStackTrace { get; init; }
}

/// <summary>The chained function ran and threw.</summary>
public class InvokeFailedException : InvokeException { }

/// <summary>The chained function did not complete within the configured (or service) timeout.</summary>
public class InvokeTimedOutException : InvokeException { }

/// <summary>The chained execution was stopped by the service before reaching a normal terminal state.</summary>
public class InvokeStoppedException : InvokeException { }

/// <summary>
/// Thrown when a child context operation fails.
/// </summary>
Expand Down
38 changes: 38 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,44 @@ private static bool IsCallbackErrorTypeString(string? errorType) =>
|| errorType == typeof(CallbackTimeoutException).FullName
|| errorType == typeof(CallbackSubmitterException).FullName
|| errorType == typeof(CallbackException).FullName;

public Task<TResult> InvokeAsync<TPayload, TResult>(
string functionName,
TPayload payload,
string? name = null,
InvokeConfig? config = null,
CancellationToken cancellationToken = default)
=> RunInvoke<TPayload, TResult>(
functionName, payload,
name, config, cancellationToken);

private Task<TResult> RunInvoke<TPayload, TResult>(
string functionName,
TPayload payload,
string? name,
InvokeConfig? config,
CancellationToken cancellationToken)
{
// Argument validation runs synchronously at the call site (matches the
// .NET convention of failing fast for misuse). Match Python/JS/Java
// parity: only check for null/empty here; the durable execution service
// enforces the qualified-ARN rule and surfaces a precise error when an
// unqualified identifier is used.
ArgumentNullException.ThrowIfNull(functionName);
if (string.IsNullOrWhiteSpace(functionName))
throw new ArgumentException("Function name must not be empty or whitespace.", nameof(functionName));

var serializer = LambdaSerializerHelper.GetRequired(LambdaContext);

cancellationToken.ThrowIfCancellationRequested();

var operationId = _idGenerator.NextId();
var op = new InvokeOperation<TPayload, TResult>(
operationId, name, _idGenerator.ParentId, functionName, payload, config,
serializer,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
}

internal sealed class WaitForCallbackContext : IWaitForCallbackContext
Expand Down
45 changes: 45 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,51 @@ Task<T> WaitForCallbackAsync<T>(
string? name = null,
WaitForCallbackConfig? config = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Invoke another durable Lambda function and await its result. The
/// invocation is checkpointed so it survives parent failures and is not
/// double-fired on replay.
/// </summary>
/// <remarks>
/// <para>
/// The chained function runs out-of-process: the SDK checkpoints a
/// <c>CHAINED_INVOKE START</c> with the supplied <paramref name="payload"/>
/// and suspends; the durable execution service runs the target function
/// asynchronously and re-invokes the parent workflow when it completes.
/// On resume, the cached result is deserialized and returned, or the
/// recorded failure surfaces as an <see cref="InvokeException"/> subclass.
/// </para>
/// <para>
/// <paramref name="functionName"/> must be a qualified identifier (version,
/// alias, or <c>$LATEST</c>). Unqualified ARNs are rejected by the durable
/// execution service. The SDK only validates that the value is non-null
/// and non-empty; ARN shape validation is left to the service to keep
/// behavior consistent with the Python, JavaScript, and Java SDKs.
/// </para>
/// <para>
/// The payload and result are serialized to/from a checkpoint using the
/// <see cref="ILambdaSerializer"/> registered on
/// <see cref="ILambdaContext.Serializer"/> (typically configured via
/// <c>LambdaBootstrapBuilder.Create(handler, serializer)</c>). AOT and
/// reflection-based scenarios share this single overload — the AOT story
/// is determined by the registered serializer (e.g.,
/// <c>SourceGeneratorLambdaJsonSerializer&lt;TContext&gt;</c>).
/// </para>
/// </remarks>
/// <typeparam name="TPayload">The payload type sent to the chained function.</typeparam>
/// <typeparam name="TResult">The result type returned by the chained function.</typeparam>
/// <exception cref="ArgumentNullException"><paramref name="functionName"/> is null.</exception>
/// <exception cref="ArgumentException"><paramref name="functionName"/> is empty or whitespace.</exception>
/// <exception cref="InvokeFailedException">The chained function threw.</exception>
/// <exception cref="InvokeTimedOutException">The chained function did not complete within the configured timeout.</exception>
/// <exception cref="InvokeStoppedException">The chained execution was stopped by the service.</exception>
Task<TResult> InvokeAsync<TPayload, TResult>(
string functionName,
TPayload payload,
string? name = null,
InvokeConfig? config = null,
CancellationToken cancellationToken = default);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,6 @@ private static bool IsTerminalStatus(string? status) =>
status == OperationStatuses.Succeeded
|| status == OperationStatuses.Failed
|| status == OperationStatuses.Cancelled
|| status == OperationStatuses.Stopped;
|| status == OperationStatuses.Stopped
|| status == OperationStatuses.TimedOut;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
using System.IO;
using System.Text;
using Amazon.Lambda.Core;
using SdkChainedInvokeOptions = Amazon.Lambda.Model.ChainedInvokeOptions;
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;

namespace Amazon.Lambda.DurableExecution.Internal;

/// <summary>
/// Durable chained-invoke operation. Schedules an asynchronous invocation of
/// another durable Lambda function via the durable execution service and
/// suspends the parent workflow until the chained execution reaches a terminal
/// state. The service drives the chained function and re-invokes the parent
/// with an updated operation status.
/// </summary>
/// <remarks>
/// Replay branches — example:
/// <c>await ctx.InvokeAsync&lt;Req, Resp&gt;("arn:...:fn:prod", req, "process_payment")</c>
/// <list type="bullet">
/// <item><b>Fresh</b>: serialize payload → sync-flush <c>CHAINED_INVOKE START</c>
/// (carrying <see cref="SdkChainedInvokeOptions"/>) → suspend with
/// <see cref="TerminationReason.InvokePending"/>.</item>
/// <item><b>SUCCEEDED</b>: deserialize and return cached result from
/// <c>ChainedInvokeDetails.Result</c>; the chained function is NOT
/// re-invoked.</item>
/// <item><b>FAILED</b>: throw <see cref="InvokeFailedException"/> populated
/// from the recorded error.</item>
/// <item><b>TIMED_OUT</b>: throw <see cref="InvokeTimedOutException"/>.</item>
/// <item><b>STOPPED</b>: throw <see cref="InvokeStoppedException"/>.</item>
/// <item><b>STARTED</b> / <b>PENDING</b>: chained execution is still in
/// flight; re-suspend without re-checkpointing — the original
/// <c>START</c> remains authoritative.</item>
/// </list>
/// Mirrors <see cref="WaitOperation"/>'s "sync-flush START → suspend" idiom;
/// the chained function executes out-of-process so there is nothing to run
/// locally on either fresh or replay paths besides the suspend wiring.
/// Serialization is delegated to the <see cref="ILambdaSerializer"/> registered
/// on <see cref="ILambdaContext.Serializer"/>; AOT-safe and reflection-based
/// callers share the same code path (the AOT story is determined by the
/// registered serializer).
/// </remarks>
internal sealed class InvokeOperation<TPayload, TResult> : DurableOperation<TResult>
{
private readonly string _functionName;
private readonly TPayload _payload;
private readonly InvokeConfig? _config;
private readonly ILambdaSerializer _serializer;

public InvokeOperation(
string operationId,
string? name,
string? parentId,
string functionName,
TPayload payload,
InvokeConfig? config,
ILambdaSerializer serializer,
ExecutionState state,
TerminationManager termination,
string durableExecutionArn,
CheckpointBatcher? batcher = null)
: base(operationId, name, parentId, state, termination, durableExecutionArn, batcher)
{
_functionName = functionName;
_payload = payload;
_config = config;
_serializer = serializer;
}

protected override string OperationType => OperationTypes.ChainedInvoke;

protected override async Task<TResult> StartAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

var serializedPayload = SerializeValue(_payload);

// Sync-flush CHAINED_INVOKE START before suspending — the service
// cannot drive a chained execution it hasn't received. A queued-but-
// unflushed START is indistinguishable from "never enqueued" if the
// parent is recycled, so the parent would resume without the service
// ever invoking the target function.
await EnqueueAsync(new SdkOperationUpdate
{
Id = OperationId,
ParentId = ParentId,
Type = OperationTypes.ChainedInvoke,
Action = OperationAction.START,
SubType = OperationSubTypes.ChainedInvoke,
Name = Name,
Payload = serializedPayload,
ChainedInvokeOptions = new SdkChainedInvokeOptions
{
FunctionName = _functionName,
TenantId = _config?.TenantId
}
}, cancellationToken);

return await Termination.SuspendAndAwait<TResult>(
TerminationReason.InvokePending, $"invoke:{Name ?? _functionName}");
}

protected override Task<TResult> ReplayAsync(Operation existing, CancellationToken cancellationToken)
{
switch (existing.Status)
{
case OperationStatuses.Succeeded:
return Task.FromResult(DeserializeResult(existing.ChainedInvokeDetails?.Result));

case OperationStatuses.Failed:
throw BuildFailed(existing);

case OperationStatuses.TimedOut:
throw BuildTimedOut(existing);

case OperationStatuses.Stopped:
throw BuildStopped(existing);

case OperationStatuses.Started:
case OperationStatuses.Pending:
// Service hasn't reached a terminal state yet; re-suspend
// without re-checkpointing. The original START is still
// authoritative. .NET's checkpoint flow does not need Python's
// pre-suspend status re-check — a synchronous-completion race
// resolves naturally on the next replay because the service
// includes the updated operation status when it re-invokes us.
return Termination.SuspendAndAwait<TResult>(
TerminationReason.InvokePending, $"invoke:{Name ?? _functionName}");

default:
throw new NonDeterministicExecutionException(
$"Chained invoke operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
}
}

private string SerializeValue(TPayload value)
{
using var ms = new MemoryStream();
_serializer.Serialize(value, ms);
return Encoding.UTF8.GetString(ms.ToArray());
}

private TResult DeserializeResult(string? serialized)
{
if (serialized == null) return default!;
var bytes = Encoding.UTF8.GetBytes(serialized);
using var ms = new MemoryStream(bytes);
return _serializer.Deserialize<TResult>(ms);
}

private InvokeFailedException BuildFailed(Operation failedOp)
{
var err = failedOp.ChainedInvokeDetails?.Error;
return new InvokeFailedException(err?.ErrorMessage ?? "Chained invoke failed.")
{
FunctionName = _functionName,
ErrorType = err?.ErrorType,
ErrorData = err?.ErrorData,
OriginalStackTrace = err?.StackTrace
};
}

private InvokeTimedOutException BuildTimedOut(Operation failedOp)
{
var err = failedOp.ChainedInvokeDetails?.Error;
return new InvokeTimedOutException(err?.ErrorMessage ?? "Chained invoke timed out.")
{
FunctionName = _functionName,
ErrorType = err?.ErrorType,
ErrorData = err?.ErrorData,
OriginalStackTrace = err?.StackTrace
};
}

private InvokeStoppedException BuildStopped(Operation failedOp)
{
var err = failedOp.ChainedInvokeDetails?.Error;
return new InvokeStoppedException(err?.ErrorMessage ?? "Chained invoke was stopped.")
{
FunctionName = _functionName,
ErrorType = err?.ErrorType,
ErrorData = err?.ErrorData,
OriginalStackTrace = err?.StackTrace
};
}
}
40 changes: 40 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/InvokeConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Configuration for chained invoke operations.
/// </summary>
/// <remarks>
/// Use with <see cref="IDurableContext.InvokeAsync{TPayload, TResult}(string, TPayload, string?, InvokeConfig?, System.Threading.CancellationToken)"/>
/// to configure a single chained invocation. Payload/result serialization is
/// performed by the <see cref="Amazon.Lambda.Core.ILambdaSerializer"/> registered on
/// <see cref="Amazon.Lambda.Core.ILambdaContext.Serializer"/> (typically configured via
/// <c>LambdaBootstrapBuilder.Create(handler, serializer)</c>); there are
/// intentionally no serializer fields here, matching the pattern established
/// by <see cref="StepConfig"/>.
/// </remarks>
public sealed class InvokeConfig
{
/// <summary>
/// Reserved for a future SDK/service contract; not currently honored.
/// </summary>
/// <remarks>
/// The AWS Lambda model <c>ChainedInvokeOptions</c> in
/// <c>AWSSDK.Lambda</c> 4.0.13.1 does not expose a per-invocation timeout
/// field, so this property is not propagated to the durable execution
/// service today. It is kept on the configuration surface so that callers
/// who set it now will pick up the intended behavior automatically once a
/// wire field is added (or via a follow-up SDK update); the service's
/// own service-level timeout still applies in the meantime. Track at
/// <see href="https://issues.amazon.com/issues/DOTNET-8661"/>.
/// </remarks>
[Obsolete("Timeout is reserved for a future SDK/service contract; not currently honored. Tracked at https://issues.amazon.com/issues/DOTNET-8661.")]
public TimeSpan Timeout { get; set; } = TimeSpan.Zero;

/// <summary>
/// Optional tenant identifier propagated to the chained invocation via
/// <c>ChainedInvokeOptions.TenantId</c>. Used to route the invocation to a
/// tenant-isolated function. Matches the <c>tenantId</c> field on the
/// Python, JavaScript, and Java SDKs.
/// </summary>
public string? TenantId { get; set; }
}
Loading
Loading