Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
Expand All @@ -21,6 +22,8 @@
import software.amazon.lambda.durable.client.DurableExecutionClient;
import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient;
import software.amazon.lambda.durable.logging.LoggerConfig;
import software.amazon.lambda.durable.plugin.DurableInstrumentationPlugin;
import software.amazon.lambda.durable.plugin.PluginRunner;
import software.amazon.lambda.durable.retry.PollingStrategies;
import software.amazon.lambda.durable.retry.PollingStrategy;
import software.amazon.lambda.durable.serde.JacksonSerDes;
Expand Down Expand Up @@ -94,6 +97,7 @@ public final class DurableConfig {
private final LoggerConfig loggerConfig;
private final PollingStrategy pollingStrategy;
private final Duration checkpointDelay;
private final PluginRunner pluginRunner;

private DurableConfig(Builder builder) {
this.durableExecutionClient = Objects.requireNonNullElseGet(
Expand All @@ -104,6 +108,7 @@ private DurableConfig(Builder builder) {
this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults);
this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT);
this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0));
this.pluginRunner = builder.plugins != null ? new PluginRunner(builder.plugins) : PluginRunner.noOp();

validateConfiguration();
}
Expand Down Expand Up @@ -180,6 +185,16 @@ public Duration getCheckpointDelay() {
return checkpointDelay;
}

/**
* Gets the plugin runner that dispatches lifecycle events to registered plugins.
*
* @return PluginRunner instance (never null — returns no-op runner if no plugins configured)
* @experimental This method is experimental and may be changed or removed in future releases.
*/
public PluginRunner getPluginRunner() {
return pluginRunner;
}

public void validateConfiguration() {
if (getDurableExecutionClient() == null) {
throw new IllegalStateException("DurableExecutionClient configuration failed");
Expand Down Expand Up @@ -274,6 +289,7 @@ public static final class Builder {
private LoggerConfig loggerConfig;
private PollingStrategy pollingStrategy;
private Duration checkpointDelay;
private List<DurableInstrumentationPlugin> plugins;

public Builder() {}

Expand Down Expand Up @@ -383,6 +399,32 @@ public Builder withCheckpointDelay(Duration duration) {
return this;
}

/**
* Sets instrumentation plugins for observability and tracing.
*
* <p>Plugins receive lifecycle callbacks at key points during durable execution, enabling integration with
* tracing systems (e.g., OpenTelemetry, X-Ray), custom metrics, and logging enrichment.
*
* <p>Multiple plugins can be provided and will be called in order. Plugin errors are swallowed to prevent
* instrumentation from affecting execution correctness.
*
* <p>Example:
*
* <pre>{@code
* DurableConfig.builder()
* .withPlugins(List.of(new OpenTelemetryDurablePlugin()))
* .build();
* }</pre>
*
* @param plugins list of instrumentation plugins
* @return This builder
* @experimental This method is experimental and may be changed or removed in future releases.
*/
public Builder withPlugins(List<DurableInstrumentationPlugin> plugins) {
this.plugins = plugins;
return this;
}

/**
* Builds the DurableConfig instance.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.plugin;

import java.time.Instant;

/**
* Information provided when an operation attempt ends.
*
* @param id operation ID
* @param name human-readable operation name (may be null)
* @param type operation type
* @param subType operation sub-type (may be null)
* @param parentId parent operation ID (null for root-level operations)
* @param startTimestamp when the attempt started
* @param endTimestamp when the attempt ended
* @param attempt 1-based attempt number
* @param outcome the attempt outcome (SUCCEEDED, FAILED, or RETRYING)
* @param error non-null if the attempt failed
* @param nextAttemptDelaySeconds non-null if outcome is RETRYING
* @experimental This record is experimental and may be changed or removed in future releases.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this tag actually exists in Javadoc, I think people use @deprecated instead with an explanation that it's experimental.

*/
public record AttemptEndInfo(
String id,
String name,
String type,
String subType,
String parentId,
Instant startTimestamp,
Instant endTimestamp,
int attempt,
AttemptOutcome outcome,
Throwable error,
Integer nextAttemptDelaySeconds) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.plugin;

import java.time.Instant;

/**
* Attempt-level information for step retry hooks.
*
* @param id operation ID
* @param name human-readable operation name (may be null)
* @param type operation type
* @param subType operation sub-type (may be null)
* @param parentId parent operation ID (null for root-level operations)
* @param startTimestamp when the attempt started
* @param attempt 1-based attempt number
* @experimental This record is experimental and may be changed or removed in future releases.
*/
public record AttemptInfo(
String id, String name, String type, String subType, String parentId, Instant startTimestamp, int attempt) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.plugin;

/**
* Possible outcomes for an operation attempt.
*
* @experimental This enum is experimental and may be changed or removed in future releases.
*/
public enum AttemptOutcome {
SUCCEEDED,
FAILED,
RETRYING
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.plugin;

import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;

/**
* Plugin interface for instrumenting durable execution lifecycle events.
*
* <p>Implement this interface to integrate observability tools (OpenTelemetry, Datadog, etc.) with the durable
* execution SDK. The SDK calls these hooks at key lifecycle points without requiring modifications to core SDK code.
*
* <p>All methods have default no-op implementations, allowing plugins to override only the hooks they need.
*
* <p>Plugin errors are isolated — exceptions thrown by plugin methods are caught and logged but never disrupt SDK
* execution.
*
* @experimental This interface is experimental and may be changed or removed in future releases.
*/
public interface DurableInstrumentationPlugin {

// ─── Execution-level hooks ───────────────────────────────────────────

/**
* Called once when an execution first starts (not on replay invocations). Use for sampling decisions or
* execution-level span creation.
*/
default void onExecutionStart(InvocationInfo info) {}

/**
* Called when an execution reaches a terminal state (succeeded or failed). Use for writing summary records or
* flushing final data.
*
* <p>This hook is awaited — the SDK blocks until it returns before sending the response.
*/
default void onExecutionEnd(ExecutionEndInfo info) {}

// ─── Invocation-level hooks ──────────────────────────────────────────

/**
* Called at the start of each Lambda invocation. Use to set up per-invocation state (trace ID, invocation span).
*/
default void onInvocationStart(InvocationInfo info) {}

/**
* Wraps the entire invocation execution. Use this to set the active OTel context for the invocation scope.
*
* <p>This hook runs on the same thread as the user handler, making it suitable for {@code Context.makeCurrent()}
* which uses ThreadLocal.
*
* @param info invocation metadata
* @param fn the SDK invocation logic to execute
* @return the result of executing fn
*/
default <T> T wrapInvocation(InvocationInfo info, Supplier<T> fn) {
return fn.get();
}

/**
* Called at the end of each Lambda invocation. Use to flush spans/metrics before Lambda freezes.
*
* <p>This hook is awaited — the SDK blocks until it returns. This is the only safe flush point before Lambda
* freezes the execution environment.
*/
default void onInvocationEnd(InvocationInfo info) {}

// ─── Operation-level hooks ───────────────────────────────────────────

/**
* Called when an operation starts for the first time (not on replay). Use to record when an operation genuinely
* began.
*/
default void onOperationFirstStart(OperationInfo info) {}

/** Called when an operation starts (including replay). Use for logging/metrics that want replay visibility. */
default void onOperationStart(OperationInfo info) {}

/**
* Wraps child context execution (runInChildContext, map iterations, parallel branches). Use this to set the active
* OTel context for child context scopes.
*
* <p>This hook runs on the child context thread, making it suitable for {@code Context.makeCurrent()}.
*
* @param info operation metadata
* @param fn the child context logic to execute
* @return the result of executing fn
*/
default <T> T wrapChildContextFn(OperationInfo info, Supplier<T> fn) {
return fn.get();
}

/**
* Called when an operation completes for the first time (not on replay). The OTel plugin creates the operation span
* here with backfilled start/end timestamps.
*/
default void onOperationFirstEnd(OperationEndInfo info) {}

// ─── Operation Attempt-level hooks ───────────────────────────────────

/** Called before each attempt of a retryable operation (step, waitForCondition). Use to start an attempt span. */
default void onOperationAttemptStart(AttemptInfo info) {}

/**
* Wraps a single operation attempt execution. Use this to set the active OTel context so auto-instrumented API
* calls within the attempt are correctly parented under the attempt span.
*
* <p>This hook runs on the step thread, making it suitable for {@code Context.makeCurrent()}.
*
* @param info attempt metadata
* @param fn the attempt logic to execute
* @return the result of executing fn
*/
default <T> T wrapOperationAttemptFn(AttemptInfo info, Supplier<T> fn) {
return fn.get();
}

/**
* Called after each attempt completes (succeeded, failed, or retrying). Use to end the attempt span and record
* outcome/errors.
*/
default void onOperationAttemptEnd(AttemptEndInfo info) {}

// ─── Utility hooks ───────────────────────────────────────────────────

/**
* Called when a checkpoint response contains operation status changes. Use for real-time dashboards or metrics on
* operation state transitions.
*/
default void onOperationChange(OperationChangeInfo info) {}

/**
* Returns additional key-value pairs to include in SDK log context via MDC. Called on each log emission. Return
* empty map for no enrichment.
*
* <p>Implementations must be thread-safe as this may be called from any thread.
*/
default Map<String, String> enrichLogContext() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.plugin;

import java.util.Map;
import software.amazon.awssdk.services.lambda.model.Operation;

/**
* Information provided when a durable execution ends.
*
* @param requestId the Lambda request ID
* @param executionArn the durable execution ARN
* @param status the execution outcome (SUCCEEDED or FAILED)
* @param executionResult the result if succeeded (may be null)
* @param executionError the error if failed (may be null)
* @param executionInput the original execution input
* @param operations all operations in the execution (final state)
* @experimental This record is experimental and may be changed or removed in future releases.
*/
public record ExecutionEndInfo(
String requestId,
String executionArn,
ExecutionStatus status,
Object executionResult,
Throwable executionError,
Object executionInput,
Map<String, Operation> operations) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.plugin;

/**
* Terminal status of a durable execution.
*
* @experimental This enum is experimental and may be changed or removed in future releases.
*/
public enum ExecutionStatus {
SUCCEEDED,
FAILED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.plugin;

/**
* Invocation-level information available to plugin hooks.
*
* @param requestId the Lambda request ID for this invocation
* @param executionArn the durable execution ARN
* @experimental This record is experimental and may be changed or removed in future releases.
*/
public record InvocationInfo(String requestId, String executionArn) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.plugin;

import java.util.Map;
import software.amazon.awssdk.services.lambda.model.Operation;

/**
* Information about operation state changes from a checkpoint response.
*
* @param requestId the Lambda request ID
* @param executionArn the durable execution ARN
* @param updatedOperations operations whose status changed in this checkpoint response
* @param operations all operations in the execution (current state)
* @experimental This record is experimental and may be changed or removed in future releases.
*/
public record OperationChangeInfo(
String requestId,
String executionArn,
Map<String, Operation> updatedOperations,
Map<String, Operation> operations) {}
Loading
Loading