From ecbd5859dd85a02721d3c1ef8434b7275e70f0da Mon Sep 17 00:00:00 2001 From: Ayushi Ahjolia Date: Wed, 27 May 2026 10:05:12 -0700 Subject: [PATCH] feat(sdk): Add DurableInstrumentationPlugin interface and plugin runner --- .../amazon/lambda/durable/DurableConfig.java | 42 ++ .../lambda/durable/plugin/AttemptEndInfo.java | 34 ++ .../lambda/durable/plugin/AttemptInfo.java | 20 + .../lambda/durable/plugin/AttemptOutcome.java | 14 + .../plugin/DurableInstrumentationPlugin.java | 142 +++++ .../durable/plugin/ExecutionEndInfo.java | 27 + .../durable/plugin/ExecutionStatus.java | 13 + .../lambda/durable/plugin/InvocationInfo.java | 12 + .../durable/plugin/OperationChangeInfo.java | 21 + .../durable/plugin/OperationEndInfo.java | 28 + .../lambda/durable/plugin/OperationInfo.java | 28 + .../durable/plugin/PluginInfoConverter.java | 165 ++++++ .../lambda/durable/plugin/PluginRunner.java | 210 ++++++++ .../plugin/PluginInfoConverterTest.java | 254 +++++++++ .../durable/plugin/PluginRunnerTest.java | 489 ++++++++++++++++++ 15 files changed, 1499 insertions(+) create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptEndInfo.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptInfo.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptOutcome.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/DurableInstrumentationPlugin.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/ExecutionEndInfo.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/ExecutionStatus.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/InvocationInfo.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationChangeInfo.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationEndInfo.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationInfo.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginInfoConverter.java create mode 100644 sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginRunner.java create mode 100644 sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginInfoConverterTest.java create mode 100644 sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginRunnerTest.java diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java index f9294f589..e53b6bb88 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; +import java.util.List; import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -21,6 +22,8 @@ import software.amazon.lambda.durable.client.DurableExecutionClient; import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient; import software.amazon.lambda.durable.logging.LoggerConfig; +import software.amazon.lambda.durable.plugin.DurableInstrumentationPlugin; +import software.amazon.lambda.durable.plugin.PluginRunner; import software.amazon.lambda.durable.retry.PollingStrategies; import software.amazon.lambda.durable.retry.PollingStrategy; import software.amazon.lambda.durable.serde.JacksonSerDes; @@ -94,6 +97,7 @@ public final class DurableConfig { private final LoggerConfig loggerConfig; private final PollingStrategy pollingStrategy; private final Duration checkpointDelay; + private final PluginRunner pluginRunner; private DurableConfig(Builder builder) { this.durableExecutionClient = Objects.requireNonNullElseGet( @@ -104,6 +108,7 @@ private DurableConfig(Builder builder) { this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults); this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT); this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0)); + this.pluginRunner = builder.plugins != null ? new PluginRunner(builder.plugins) : PluginRunner.noOp(); validateConfiguration(); } @@ -180,6 +185,16 @@ public Duration getCheckpointDelay() { return checkpointDelay; } + /** + * Gets the plugin runner that dispatches lifecycle events to registered plugins. + * + * @return PluginRunner instance (never null — returns no-op runner if no plugins configured) + * @experimental This method is experimental and may be changed or removed in future releases. + */ + public PluginRunner getPluginRunner() { + return pluginRunner; + } + public void validateConfiguration() { if (getDurableExecutionClient() == null) { throw new IllegalStateException("DurableExecutionClient configuration failed"); @@ -274,6 +289,7 @@ public static final class Builder { private LoggerConfig loggerConfig; private PollingStrategy pollingStrategy; private Duration checkpointDelay; + private List plugins; public Builder() {} @@ -383,6 +399,32 @@ public Builder withCheckpointDelay(Duration duration) { return this; } + /** + * Sets instrumentation plugins for observability and tracing. + * + *

Plugins receive lifecycle callbacks at key points during durable execution, enabling integration with + * tracing systems (e.g., OpenTelemetry, X-Ray), custom metrics, and logging enrichment. + * + *

Multiple plugins can be provided and will be called in order. Plugin errors are swallowed to prevent + * instrumentation from affecting execution correctness. + * + *

Example: + * + *

{@code
+         * DurableConfig.builder()
+         *     .withPlugins(List.of(new OpenTelemetryDurablePlugin()))
+         *     .build();
+         * }
+ * + * @param plugins list of instrumentation plugins + * @return This builder + * @experimental This method is experimental and may be changed or removed in future releases. + */ + public Builder withPlugins(List plugins) { + this.plugins = plugins; + return this; + } + /** * Builds the DurableConfig instance. * diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptEndInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptEndInfo.java new file mode 100644 index 000000000..3ea562423 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptEndInfo.java @@ -0,0 +1,34 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import java.time.Instant; + +/** + * Information provided when an operation attempt ends. + * + * @param id operation ID + * @param name human-readable operation name (may be null) + * @param type operation type + * @param subType operation sub-type (may be null) + * @param parentId parent operation ID (null for root-level operations) + * @param startTimestamp when the attempt started + * @param endTimestamp when the attempt ended + * @param attempt 1-based attempt number + * @param outcome the attempt outcome (SUCCEEDED, FAILED, or RETRYING) + * @param error non-null if the attempt failed + * @param nextAttemptDelaySeconds non-null if outcome is RETRYING + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public record AttemptEndInfo( + String id, + String name, + String type, + String subType, + String parentId, + Instant startTimestamp, + Instant endTimestamp, + int attempt, + AttemptOutcome outcome, + Throwable error, + Integer nextAttemptDelaySeconds) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptInfo.java new file mode 100644 index 000000000..81da3d4fb --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptInfo.java @@ -0,0 +1,20 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import java.time.Instant; + +/** + * Attempt-level information for step retry hooks. + * + * @param id operation ID + * @param name human-readable operation name (may be null) + * @param type operation type + * @param subType operation sub-type (may be null) + * @param parentId parent operation ID (null for root-level operations) + * @param startTimestamp when the attempt started + * @param attempt 1-based attempt number + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public record AttemptInfo( + String id, String name, String type, String subType, String parentId, Instant startTimestamp, int attempt) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptOutcome.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptOutcome.java new file mode 100644 index 000000000..5dab38612 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/AttemptOutcome.java @@ -0,0 +1,14 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +/** + * Possible outcomes for an operation attempt. + * + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public enum AttemptOutcome { + SUCCEEDED, + FAILED, + RETRYING +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/DurableInstrumentationPlugin.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/DurableInstrumentationPlugin.java new file mode 100644 index 000000000..3c62d1a4e --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/DurableInstrumentationPlugin.java @@ -0,0 +1,142 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Plugin interface for instrumenting durable execution lifecycle events. + * + *

Implement this interface to integrate observability tools (OpenTelemetry, Datadog, etc.) with the durable + * execution SDK. The SDK calls these hooks at key lifecycle points without requiring modifications to core SDK code. + * + *

All methods have default no-op implementations, allowing plugins to override only the hooks they need. + * + *

Plugin errors are isolated — exceptions thrown by plugin methods are caught and logged but never disrupt SDK + * execution. + * + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public interface DurableInstrumentationPlugin { + + // ─── Execution-level hooks ─────────────────────────────────────────── + + /** + * Called once when an execution first starts (not on replay invocations). Use for sampling decisions or + * execution-level span creation. + */ + default void onExecutionStart(InvocationInfo info) {} + + /** + * Called when an execution reaches a terminal state (succeeded or failed). Use for writing summary records or + * flushing final data. + * + *

This hook is awaited — the SDK blocks until it returns before sending the response. + */ + default void onExecutionEnd(ExecutionEndInfo info) {} + + // ─── Invocation-level hooks ────────────────────────────────────────── + + /** + * Called at the start of each Lambda invocation. Use to set up per-invocation state (trace ID, invocation span). + */ + default void onInvocationStart(InvocationInfo info) {} + + /** + * Wraps the entire invocation execution. Use this to set the active OTel context for the invocation scope. + * + *

This hook runs on the same thread as the user handler, making it suitable for {@code Context.makeCurrent()} + * which uses ThreadLocal. + * + * @param info invocation metadata + * @param fn the SDK invocation logic to execute + * @return the result of executing fn + */ + default T wrapInvocation(InvocationInfo info, Supplier fn) { + return fn.get(); + } + + /** + * Called at the end of each Lambda invocation. Use to flush spans/metrics before Lambda freezes. + * + *

This hook is awaited — the SDK blocks until it returns. This is the only safe flush point before Lambda + * freezes the execution environment. + */ + default void onInvocationEnd(InvocationInfo info) {} + + // ─── Operation-level hooks ─────────────────────────────────────────── + + /** + * Called when an operation starts for the first time (not on replay). Use to record when an operation genuinely + * began. + */ + default void onOperationFirstStart(OperationInfo info) {} + + /** Called when an operation starts (including replay). Use for logging/metrics that want replay visibility. */ + default void onOperationStart(OperationInfo info) {} + + /** + * Wraps child context execution (runInChildContext, map iterations, parallel branches). Use this to set the active + * OTel context for child context scopes. + * + *

This hook runs on the child context thread, making it suitable for {@code Context.makeCurrent()}. + * + * @param info operation metadata + * @param fn the child context logic to execute + * @return the result of executing fn + */ + default T wrapChildContextFn(OperationInfo info, Supplier fn) { + return fn.get(); + } + + /** + * Called when an operation completes for the first time (not on replay). The OTel plugin creates the operation span + * here with backfilled start/end timestamps. + */ + default void onOperationFirstEnd(OperationEndInfo info) {} + + // ─── Operation Attempt-level hooks ─────────────────────────────────── + + /** Called before each attempt of a retryable operation (step, waitForCondition). Use to start an attempt span. */ + default void onOperationAttemptStart(AttemptInfo info) {} + + /** + * Wraps a single operation attempt execution. Use this to set the active OTel context so auto-instrumented API + * calls within the attempt are correctly parented under the attempt span. + * + *

This hook runs on the step thread, making it suitable for {@code Context.makeCurrent()}. + * + * @param info attempt metadata + * @param fn the attempt logic to execute + * @return the result of executing fn + */ + default T wrapOperationAttemptFn(AttemptInfo info, Supplier fn) { + return fn.get(); + } + + /** + * Called after each attempt completes (succeeded, failed, or retrying). Use to end the attempt span and record + * outcome/errors. + */ + default void onOperationAttemptEnd(AttemptEndInfo info) {} + + // ─── Utility hooks ─────────────────────────────────────────────────── + + /** + * Called when a checkpoint response contains operation status changes. Use for real-time dashboards or metrics on + * operation state transitions. + */ + default void onOperationChange(OperationChangeInfo info) {} + + /** + * Returns additional key-value pairs to include in SDK log context via MDC. Called on each log emission. Return + * empty map for no enrichment. + * + *

Implementations must be thread-safe as this may be called from any thread. + */ + default Map enrichLogContext() { + return Collections.emptyMap(); + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/ExecutionEndInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/ExecutionEndInfo.java new file mode 100644 index 000000000..587d02009 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/ExecutionEndInfo.java @@ -0,0 +1,27 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import java.util.Map; +import software.amazon.awssdk.services.lambda.model.Operation; + +/** + * Information provided when a durable execution ends. + * + * @param requestId the Lambda request ID + * @param executionArn the durable execution ARN + * @param status the execution outcome (SUCCEEDED or FAILED) + * @param executionResult the result if succeeded (may be null) + * @param executionError the error if failed (may be null) + * @param executionInput the original execution input + * @param operations all operations in the execution (final state) + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public record ExecutionEndInfo( + String requestId, + String executionArn, + ExecutionStatus status, + Object executionResult, + Throwable executionError, + Object executionInput, + Map operations) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/ExecutionStatus.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/ExecutionStatus.java new file mode 100644 index 000000000..5bef53350 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/ExecutionStatus.java @@ -0,0 +1,13 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +/** + * Terminal status of a durable execution. + * + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public enum ExecutionStatus { + SUCCEEDED, + FAILED +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/InvocationInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/InvocationInfo.java new file mode 100644 index 000000000..105ba1f5d --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/InvocationInfo.java @@ -0,0 +1,12 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +/** + * Invocation-level information available to plugin hooks. + * + * @param requestId the Lambda request ID for this invocation + * @param executionArn the durable execution ARN + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public record InvocationInfo(String requestId, String executionArn) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationChangeInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationChangeInfo.java new file mode 100644 index 000000000..32491c966 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationChangeInfo.java @@ -0,0 +1,21 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import java.util.Map; +import software.amazon.awssdk.services.lambda.model.Operation; + +/** + * Information about operation state changes from a checkpoint response. + * + * @param requestId the Lambda request ID + * @param executionArn the durable execution ARN + * @param updatedOperations operations whose status changed in this checkpoint response + * @param operations all operations in the execution (current state) + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public record OperationChangeInfo( + String requestId, + String executionArn, + Map updatedOperations, + Map operations) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationEndInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationEndInfo.java new file mode 100644 index 000000000..0c241160c --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationEndInfo.java @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import java.time.Instant; + +/** + * Extended operation information for operation end events. + * + * @param id operation ID + * @param name human-readable operation name (may be null) + * @param type operation type + * @param subType operation sub-type (may be null) + * @param parentId parent operation ID (null for root-level operations) + * @param startTimestamp when the operation started + * @param endTimestamp when the operation ended + * @param error non-null if the operation failed + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public record OperationEndInfo( + String id, + String name, + String type, + String subType, + String parentId, + Instant startTimestamp, + Instant endTimestamp, + Throwable error) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationInfo.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationInfo.java new file mode 100644 index 000000000..93513aceb --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/OperationInfo.java @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import java.time.Instant; + +/** + * Operation-level information available to plugin hooks. + * + *

Field names mirror the {@code Operation} type from the AWS SDK for consistency. + * + * @param id operation ID (unique within the execution) + * @param name human-readable operation name (may be null) + * @param type operation type (STEP, WAIT, CONTEXT, CHAINED_INVOKE, CALLBACK) + * @param subType operation sub-type (Map, Parallel, RunInChildContext, WaitForCondition, etc.) — may be null + * @param parentId parent operation ID (null for root-level operations) + * @param startTimestamp when the operation started (may be from a prior invocation) + * @param endTimestamp when the operation ended (null if still running) + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public record OperationInfo( + String id, + String name, + String type, + String subType, + String parentId, + Instant startTimestamp, + Instant endTimestamp) {} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginInfoConverter.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginInfoConverter.java new file mode 100644 index 000000000..d86d54d6c --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginInfoConverter.java @@ -0,0 +1,165 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import java.time.Instant; +import software.amazon.awssdk.services.lambda.model.Operation; +import software.amazon.awssdk.services.lambda.model.OperationType; +import software.amazon.lambda.durable.model.OperationSubType; + +/** + * Utility methods for converting SDK internal types to plugin info records. + * + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public final class PluginInfoConverter { + + private PluginInfoConverter() {} + + /** + * Converts an SDK {@link Operation} to an {@link OperationInfo} for plugin hooks. + * + * @param operation the SDK operation (may be null for first-start scenarios) + * @param operationId the operation ID + * @param name the operation name + * @param type the operation type + * @param subType the operation sub-type (may be null) + * @param parentId the parent operation ID (may be null for root operations) + * @return an OperationInfo record + */ + public static OperationInfo toOperationInfo( + Operation operation, + String operationId, + String name, + OperationType type, + OperationSubType subType, + String parentId) { + return new OperationInfo( + operationId, + name, + type != null ? type.toString() : null, + subType != null ? subType.getValue() : null, + parentId, + operation != null ? operation.startTimestamp() : null, + operation != null ? operation.endTimestamp() : null); + } + + /** + * Converts an SDK {@link Operation} to an {@link OperationInfo} using the operation's own fields. + * + * @param operation the SDK operation + * @return an OperationInfo record + */ + public static OperationInfo toOperationInfo(Operation operation) { + if (operation == null) { + return new OperationInfo(null, null, null, null, null, null, null); + } + return new OperationInfo( + operation.id(), + operation.name(), + operation.type() != null ? operation.type().toString() : null, + operation.subType(), + operation.parentId(), + operation.startTimestamp(), + operation.endTimestamp()); + } + + /** + * Creates an {@link OperationEndInfo} from an SDK {@link Operation} and an optional error. + * + * @param operation the completed SDK operation + * @param operationId the operation ID + * @param name the operation name + * @param type the operation type + * @param subType the operation sub-type (may be null) + * @param parentId the parent operation ID (may be null) + * @param error the error if the operation failed (may be null) + * @return an OperationEndInfo record + */ + public static OperationEndInfo toOperationEndInfo( + Operation operation, + String operationId, + String name, + OperationType type, + OperationSubType subType, + String parentId, + Throwable error) { + return new OperationEndInfo( + operationId, + name, + type != null ? type.toString() : null, + subType != null ? subType.getValue() : null, + parentId, + operation != null ? operation.startTimestamp() : null, + operation != null ? operation.endTimestamp() : null, + error); + } + + /** + * Creates an {@link AttemptInfo} for a step or waitForCondition attempt. + * + * @param operationId the operation ID + * @param name the operation name + * @param type the operation type + * @param subType the operation sub-type (may be null) + * @param parentId the parent operation ID (may be null) + * @param attempt the 1-based attempt number + * @return an AttemptInfo record + */ + public static AttemptInfo toAttemptInfo( + String operationId, + String name, + OperationType type, + OperationSubType subType, + String parentId, + int attempt) { + return new AttemptInfo( + operationId, + name, + type != null ? type.toString() : null, + subType != null ? subType.getValue() : null, + parentId, + Instant.now(), + attempt); + } + + /** + * Creates an {@link AttemptEndInfo} for a completed attempt. + * + * @param operationId the operation ID + * @param name the operation name + * @param type the operation type + * @param subType the operation sub-type (may be null) + * @param parentId the parent operation ID (may be null) + * @param startTimestamp when the attempt started + * @param attempt the 1-based attempt number + * @param outcome the attempt outcome + * @param error the error if the attempt failed (may be null) + * @param nextAttemptDelaySeconds delay before next retry (null if not retrying) + * @return an AttemptEndInfo record + */ + public static AttemptEndInfo toAttemptEndInfo( + String operationId, + String name, + OperationType type, + OperationSubType subType, + String parentId, + Instant startTimestamp, + int attempt, + AttemptOutcome outcome, + Throwable error, + Integer nextAttemptDelaySeconds) { + return new AttemptEndInfo( + operationId, + name, + type != null ? type.toString() : null, + subType != null ? subType.getValue() : null, + parentId, + startTimestamp, + Instant.now(), + attempt, + outcome, + error, + nextAttemptDelaySeconds); + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginRunner.java b/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginRunner.java new file mode 100644 index 000000000..0333c416a --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/plugin/PluginRunner.java @@ -0,0 +1,210 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Composes multiple {@link DurableInstrumentationPlugin} instances into a single dispatcher. + * + *

Event hooks are fire-and-forget: each plugin is called in order, errors are swallowed. Wrap hooks are chained: the + * innermost plugin wraps the SDK function, and each outer plugin wraps the next. + * + *

{@code onInvocationEnd} and {@code onExecutionEnd} are awaited (the SDK blocks until they return) to allow plugins + * to flush data before Lambda freezes. + * + *

Preview API: This API is experimental and may be changed or removed in future releases. + */ +public class PluginRunner { + + private static final Logger logger = LoggerFactory.getLogger(PluginRunner.class); + private static final PluginRunner NO_OP = new PluginRunner(Collections.emptyList()); + + private final List plugins; + + public PluginRunner(List plugins) { + this.plugins = plugins != null ? List.copyOf(plugins) : Collections.emptyList(); + } + + /** Returns a no-op runner that does nothing. */ + public static PluginRunner noOp() { + return NO_OP; + } + + /** Returns true if no plugins are registered. */ + public boolean isEmpty() { + return plugins.isEmpty(); + } + + // ─── Fire-and-forget event hooks ───────────────────────────────────── + + /** Calls a void hook on all plugins, swallowing any errors. */ + private void run(Consumer hook) { + for (var plugin : plugins) { + try { + hook.accept(plugin); + } catch (Exception e) { + logger.warn("Plugin hook threw exception", e); + } + } + } + + public void onExecutionStart(InvocationInfo info) { + run(p -> p.onExecutionStart(info)); + } + + public void onInvocationStart(InvocationInfo info) { + run(p -> p.onInvocationStart(info)); + } + + public void onOperationFirstStart(OperationInfo info) { + run(p -> p.onOperationFirstStart(info)); + } + + public void onOperationStart(OperationInfo info) { + run(p -> p.onOperationStart(info)); + } + + public void onOperationFirstEnd(OperationEndInfo info) { + run(p -> p.onOperationFirstEnd(info)); + } + + public void onOperationAttemptStart(AttemptInfo info) { + run(p -> p.onOperationAttemptStart(info)); + } + + public void onOperationAttemptEnd(AttemptEndInfo info) { + run(p -> p.onOperationAttemptEnd(info)); + } + + public void onOperationChange(OperationChangeInfo info) { + run(p -> p.onOperationChange(info)); + } + + // ─── Awaited hooks (SDK blocks until return) ───────────────────────── + + public void onExecutionEnd(ExecutionEndInfo info) { + for (var plugin : plugins) { + try { + plugin.onExecutionEnd(info); + } catch (Exception e) { + logger.warn("Plugin onExecutionEnd threw exception", e); + } + } + } + + public void onInvocationEnd(InvocationInfo info) { + for (var plugin : plugins) { + try { + plugin.onInvocationEnd(info); + } catch (Exception e) { + logger.warn("Plugin onInvocationEnd threw exception", e); + } + } + } + + // ─── Wrap hooks (chained, errors don't swallow user function) ──────── + + /** + * Chains wrap hooks from all plugins around the given function. Plugins are applied in order (first plugin is + * outermost wrapper). If a plugin's wrap hook throws, the SDK falls back to executing without that plugin's wrap. + * + *

Critically, if the user function (fn) throws, that error is always propagated — plugins cannot swallow it. + */ + private T runAsCallback( + Function, T>> hookExtractor, Supplier fn) { + if (plugins.isEmpty()) { + return fn.get(); + } + + // Capture any error thrown by the user function + @SuppressWarnings("unchecked") + Throwable[] fnError = {null}; + + // Wrap the original fn to capture any error it throws + Supplier guardedFn = () -> { + try { + return fn.get(); + } catch (Throwable err) { + fnError[0] = err; + throw err; // re-throw so plugins still see it + } + }; + + // Build the chain: innermost is guardedFn, each plugin wraps the next + Supplier chain = guardedFn; + for (int i = plugins.size() - 1; i >= 0; i--) { + var plugin = plugins.get(i); + var next = chain; + chain = () -> { + try { + return hookExtractor.apply(plugin).apply(next); + } catch (Throwable e) { + // If this is the user function error bubbling up, let it propagate + if (fnError[0] != null && e == fnError[0]) { + sneakyThrow(e); + } + // Plugin error — fall back to calling next without this plugin's wrap + logger.warn("Plugin wrap hook threw exception, executing without wrap", e); + return next.get(); + } + }; + } + + T result = chain.get(); + + // If fn threw but the chain swallowed it, re-throw + if (fnError[0] != null) { + sneakyThrow(fnError[0]); + } + + return result; + } + + @SuppressWarnings("unchecked") + private static void sneakyThrow(Throwable e) throws E { + throw (E) e; + } + + public T wrapInvocation(InvocationInfo info, Supplier fn) { + if (plugins.isEmpty()) return fn.get(); + return runAsCallback(p -> (next) -> p.wrapInvocation(info, next), fn); + } + + public T wrapChildContextFn(OperationInfo info, Supplier fn) { + if (plugins.isEmpty()) return fn.get(); + return runAsCallback(p -> (next) -> p.wrapChildContextFn(info, next), fn); + } + + public T wrapOperationAttemptFn(AttemptInfo info, Supplier fn) { + if (plugins.isEmpty()) return fn.get(); + return runAsCallback(p -> (next) -> p.wrapOperationAttemptFn(info, next), fn); + } + + // ─── Utility hooks ─────────────────────────────────────────────────── + + /** Merges enrichLogContext results from all plugins. Later plugins override earlier ones for duplicate keys. */ + public Map enrichLogContext() { + if (plugins.isEmpty()) return Collections.emptyMap(); + + Map merged = new java.util.HashMap<>(); + for (var plugin : plugins) { + try { + var context = plugin.enrichLogContext(); + if (context != null) { + merged.putAll(context); + } + } catch (Exception e) { + logger.warn("Plugin enrichLogContext threw exception", e); + } + } + return merged; + } +} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginInfoConverterTest.java b/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginInfoConverterTest.java new file mode 100644 index 000000000..e28ab534f --- /dev/null +++ b/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginInfoConverterTest.java @@ -0,0 +1,254 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import static org.junit.jupiter.api.Assertions.*; + +import java.time.Instant; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.lambda.model.Operation; +import software.amazon.awssdk.services.lambda.model.OperationStatus; +import software.amazon.awssdk.services.lambda.model.OperationType; +import software.amazon.lambda.durable.model.OperationSubType; + +class PluginInfoConverterTest { + + private static final String OPERATION_ID = "op-1"; + private static final String OPERATION_NAME = "validate-order"; + private static final String PARENT_ID = "parent-ctx"; + private static final Instant START = Instant.parse("2026-01-01T00:00:00Z"); + private static final Instant END = Instant.parse("2026-01-01T00:00:05Z"); + + // ─── toOperationInfo (from Operation) ──────────────────────────────── + + @Test + void toOperationInfo_fromOperation_mapsAllFields() { + var operation = Operation.builder() + .id(OPERATION_ID) + .name(OPERATION_NAME) + .type(OperationType.STEP) + .subType("WaitForCondition") + .parentId(PARENT_ID) + .status(OperationStatus.SUCCEEDED) + .startTimestamp(START) + .endTimestamp(END) + .build(); + + var info = PluginInfoConverter.toOperationInfo(operation); + + assertEquals(OPERATION_ID, info.id()); + assertEquals(OPERATION_NAME, info.name()); + assertEquals("STEP", info.type()); + assertEquals("WaitForCondition", info.subType()); + assertEquals(PARENT_ID, info.parentId()); + assertEquals(START, info.startTimestamp()); + assertEquals(END, info.endTimestamp()); + } + + @Test + void toOperationInfo_fromNullOperation_returnsAllNulls() { + var info = PluginInfoConverter.toOperationInfo(null); + + assertNull(info.id()); + assertNull(info.name()); + assertNull(info.type()); + assertNull(info.subType()); + assertNull(info.parentId()); + assertNull(info.startTimestamp()); + assertNull(info.endTimestamp()); + } + + @Test + void toOperationInfo_fromOperation_handlesNullFields() { + var operation = Operation.builder() + .id(OPERATION_ID) + .status(OperationStatus.STARTED) + .startTimestamp(START) + .build(); + + var info = PluginInfoConverter.toOperationInfo(operation); + + assertEquals(OPERATION_ID, info.id()); + assertNull(info.name()); + assertNull(info.type()); + assertNull(info.subType()); + assertNull(info.parentId()); + assertEquals(START, info.startTimestamp()); + assertNull(info.endTimestamp()); + } + + // ─── toOperationInfo (from explicit params) ────────────────────────── + + @Test + void toOperationInfo_fromParams_mapsAllFields() { + var operation = + Operation.builder().startTimestamp(START).endTimestamp(END).build(); + + var info = PluginInfoConverter.toOperationInfo( + operation, + OPERATION_ID, + OPERATION_NAME, + OperationType.STEP, + OperationSubType.WAIT_FOR_CONDITION, + PARENT_ID); + + assertEquals(OPERATION_ID, info.id()); + assertEquals(OPERATION_NAME, info.name()); + assertEquals("STEP", info.type()); + assertEquals("WaitForCondition", info.subType()); + assertEquals(PARENT_ID, info.parentId()); + assertEquals(START, info.startTimestamp()); + assertEquals(END, info.endTimestamp()); + } + + @Test + void toOperationInfo_fromParams_nullOperation_noTimestamps() { + var info = PluginInfoConverter.toOperationInfo( + null, OPERATION_ID, OPERATION_NAME, OperationType.WAIT, OperationSubType.WAIT, null); + + assertEquals(OPERATION_ID, info.id()); + assertEquals(OPERATION_NAME, info.name()); + assertEquals("WAIT", info.type()); + assertEquals("Wait", info.subType()); + assertNull(info.parentId()); + assertNull(info.startTimestamp()); + assertNull(info.endTimestamp()); + } + + @Test + void toOperationInfo_fromParams_nullSubType() { + var info = + PluginInfoConverter.toOperationInfo(null, OPERATION_ID, OPERATION_NAME, OperationType.STEP, null, null); + + assertNull(info.subType()); + } + + // ─── toOperationEndInfo ────────────────────────────────────────────── + + @Test + void toOperationEndInfo_mapsAllFields() { + var operation = + Operation.builder().startTimestamp(START).endTimestamp(END).build(); + var error = new RuntimeException("step failed"); + + var info = PluginInfoConverter.toOperationEndInfo( + operation, OPERATION_ID, OPERATION_NAME, OperationType.STEP, OperationSubType.STEP, PARENT_ID, error); + + assertEquals(OPERATION_ID, info.id()); + assertEquals(OPERATION_NAME, info.name()); + assertEquals("STEP", info.type()); + assertEquals("Step", info.subType()); + assertEquals(PARENT_ID, info.parentId()); + assertEquals(START, info.startTimestamp()); + assertEquals(END, info.endTimestamp()); + assertEquals(error, info.error()); + } + + @Test + void toOperationEndInfo_nullError_forSuccess() { + var operation = + Operation.builder().startTimestamp(START).endTimestamp(END).build(); + + var info = PluginInfoConverter.toOperationEndInfo( + operation, OPERATION_ID, OPERATION_NAME, OperationType.STEP, OperationSubType.STEP, null, null); + + assertNull(info.error()); + } + + // ─── toAttemptInfo ─────────────────────────────────────────────────── + + @Test + void toAttemptInfo_mapsAllFields() { + var info = PluginInfoConverter.toAttemptInfo( + OPERATION_ID, OPERATION_NAME, OperationType.STEP, OperationSubType.STEP, PARENT_ID, 3); + + assertEquals(OPERATION_ID, info.id()); + assertEquals(OPERATION_NAME, info.name()); + assertEquals("STEP", info.type()); + assertEquals("Step", info.subType()); + assertEquals(PARENT_ID, info.parentId()); + assertNotNull(info.startTimestamp()); + assertEquals(3, info.attempt()); + } + + @Test + void toAttemptInfo_nullSubType() { + var info = PluginInfoConverter.toAttemptInfo(OPERATION_ID, OPERATION_NAME, OperationType.STEP, null, null, 1); + + assertNull(info.subType()); + assertNull(info.parentId()); + assertEquals(1, info.attempt()); + } + + // ─── toAttemptEndInfo ──────────────────────────────────────────────── + + @Test + void toAttemptEndInfo_succeeded() { + var info = PluginInfoConverter.toAttemptEndInfo( + OPERATION_ID, + OPERATION_NAME, + OperationType.STEP, + OperationSubType.STEP, + PARENT_ID, + START, + 1, + AttemptOutcome.SUCCEEDED, + null, + null); + + assertEquals(OPERATION_ID, info.id()); + assertEquals(OPERATION_NAME, info.name()); + assertEquals("STEP", info.type()); + assertEquals("Step", info.subType()); + assertEquals(PARENT_ID, info.parentId()); + assertEquals(START, info.startTimestamp()); + assertNotNull(info.endTimestamp()); + assertEquals(1, info.attempt()); + assertEquals(AttemptOutcome.SUCCEEDED, info.outcome()); + assertNull(info.error()); + assertNull(info.nextAttemptDelaySeconds()); + } + + @Test + void toAttemptEndInfo_retrying() { + var error = new RuntimeException("transient failure"); + + var info = PluginInfoConverter.toAttemptEndInfo( + OPERATION_ID, + OPERATION_NAME, + OperationType.STEP, + OperationSubType.STEP, + null, + START, + 2, + AttemptOutcome.RETRYING, + error, + 5); + + assertEquals(AttemptOutcome.RETRYING, info.outcome()); + assertEquals(error, info.error()); + assertEquals(5, info.nextAttemptDelaySeconds()); + assertEquals(2, info.attempt()); + } + + @Test + void toAttemptEndInfo_failed() { + var error = new RuntimeException("permanent failure"); + + var info = PluginInfoConverter.toAttemptEndInfo( + OPERATION_ID, + OPERATION_NAME, + OperationType.STEP, + OperationSubType.STEP, + null, + START, + 3, + AttemptOutcome.FAILED, + error, + null); + + assertEquals(AttemptOutcome.FAILED, info.outcome()); + assertEquals(error, info.error()); + assertNull(info.nextAttemptDelaySeconds()); + } +} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginRunnerTest.java b/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginRunnerTest.java new file mode 100644 index 000000000..084e7ba6c --- /dev/null +++ b/sdk/src/test/java/software/amazon/lambda/durable/plugin/PluginRunnerTest.java @@ -0,0 +1,489 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.plugin; + +import static org.junit.jupiter.api.Assertions.*; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.junit.jupiter.api.Test; + +class PluginRunnerTest { + + // ─── No-op / empty behavior ────────────────────────────────────────── + + @Test + void noOpRunner_doesNothing() { + var runner = PluginRunner.noOp(); + + assertTrue(runner.isEmpty()); + assertDoesNotThrow(() -> runner.onExecutionStart(invocationInfo())); + assertDoesNotThrow(() -> runner.onInvocationEnd(invocationInfo())); + assertEquals("hello", runner.wrapInvocation(invocationInfo(), () -> "hello")); + assertEquals(Collections.emptyMap(), runner.enrichLogContext()); + } + + @Test + void emptyPluginList_behavesAsNoOp() { + var runner = new PluginRunner(List.of()); + + assertTrue(runner.isEmpty()); + assertEquals("result", runner.wrapOperationAttemptFn(attemptInfo(), () -> "result")); + } + + @Test + void nullPluginList_behavesAsNoOp() { + var runner = new PluginRunner(null); + + assertTrue(runner.isEmpty()); + assertDoesNotThrow(() -> runner.onOperationStart(operationInfo())); + } + + // ─── Fire-and-forget event hooks ───────────────────────────────────── + + @Test + void fireAndForget_callsAllPlugins() { + var calls = new ArrayList(); + var plugin1 = new TestPlugin("p1", calls); + var plugin2 = new TestPlugin("p2", calls); + var runner = new PluginRunner(List.of(plugin1, plugin2)); + + runner.onExecutionStart(invocationInfo()); + + assertEquals(List.of("p1:onExecutionStart", "p2:onExecutionStart"), calls); + } + + @Test + void fireAndForget_swallowsExceptions() { + var calls = new ArrayList(); + var throwingPlugin = new ThrowingPlugin(); + var normalPlugin = new TestPlugin("p2", calls); + var runner = new PluginRunner(List.of(throwingPlugin, normalPlugin)); + + assertDoesNotThrow(() -> runner.onInvocationStart(invocationInfo())); + assertEquals(List.of("p2:onInvocationStart"), calls); + } + + @Test + void fireAndForget_callsAllHookTypes() { + var calls = new ArrayList(); + var plugin = new TestPlugin("p", calls); + var runner = new PluginRunner(List.of(plugin)); + + runner.onExecutionStart(invocationInfo()); + runner.onInvocationStart(invocationInfo()); + runner.onOperationFirstStart(operationInfo()); + runner.onOperationStart(operationInfo()); + runner.onOperationFirstEnd(operationEndInfo()); + runner.onOperationAttemptStart(attemptInfo()); + runner.onOperationAttemptEnd(attemptEndInfo()); + runner.onOperationChange(operationChangeInfo()); + + assertEquals( + List.of( + "p:onExecutionStart", + "p:onInvocationStart", + "p:onOperationFirstStart", + "p:onOperationStart", + "p:onOperationFirstEnd", + "p:onOperationAttemptStart", + "p:onOperationAttemptEnd", + "p:onOperationChange"), + calls); + } + + // ─── Awaited hooks ─────────────────────────────────────────────────── + + @Test + void awaitedHooks_callAllPlugins() { + var calls = new ArrayList(); + var plugin1 = new TestPlugin("p1", calls); + var plugin2 = new TestPlugin("p2", calls); + var runner = new PluginRunner(List.of(plugin1, plugin2)); + + runner.onInvocationEnd(invocationInfo()); + runner.onExecutionEnd(executionEndInfo()); + + assertEquals( + List.of("p1:onInvocationEnd", "p2:onInvocationEnd", "p1:onExecutionEnd", "p2:onExecutionEnd"), calls); + } + + @Test + void awaitedHooks_swallowExceptions_butCallRemainingPlugins() { + var calls = new ArrayList(); + var throwingPlugin = new ThrowingPlugin(); + var normalPlugin = new TestPlugin("p2", calls); + var runner = new PluginRunner(List.of(throwingPlugin, normalPlugin)); + + assertDoesNotThrow(() -> runner.onInvocationEnd(invocationInfo())); + assertEquals(List.of("p2:onInvocationEnd"), calls); + } + + // ─── Wrap hooks ────────────────────────────────────────────────────── + + @Test + void wrapInvocation_executesFunction() { + var plugin = new TestPlugin("p", new ArrayList<>()); + var runner = new PluginRunner(List.of(plugin)); + + var result = runner.wrapInvocation(invocationInfo(), () -> "wrapped-result"); + + assertEquals("wrapped-result", result); + } + + @Test + void wrapInvocation_chainsMultiplePlugins() { + var order = new ArrayList(); + var plugin1 = new WrapTrackingPlugin("p1", order); + var plugin2 = new WrapTrackingPlugin("p2", order); + var runner = new PluginRunner(List.of(plugin1, plugin2)); + + var result = runner.wrapInvocation(invocationInfo(), () -> { + order.add("fn"); + return "result"; + }); + + // First plugin is outermost, wraps second, which wraps fn + assertEquals(List.of("p1:before", "p2:before", "fn", "p2:after", "p1:after"), order); + assertEquals("result", result); + } + + @Test + void wrapHook_userFunctionError_propagates() { + var plugin = new TestPlugin("p", new ArrayList<>()); + var runner = new PluginRunner(List.of(plugin)); + + var ex = assertThrows( + RuntimeException.class, + () -> runner.wrapInvocation(invocationInfo(), () -> { + throw new RuntimeException("user error"); + })); + + assertEquals("user error", ex.getMessage()); + } + + @Test + void wrapHook_pluginThrows_fallsBackToNextPlugin() { + var calls = new ArrayList(); + var throwingWrapPlugin = new ThrowingWrapPlugin(); + var normalPlugin = new WrapTrackingPlugin("p2", calls); + var runner = new PluginRunner(List.of(throwingWrapPlugin, normalPlugin)); + + var result = runner.wrapInvocation(invocationInfo(), () -> { + calls.add("fn"); + return "result"; + }); + + // Throwing plugin is skipped, normal plugin still wraps + assertEquals(List.of("p2:before", "fn", "p2:after"), calls); + assertEquals("result", result); + } + + @Test + void wrapChildContextFn_executesFunction() { + var runner = new PluginRunner(List.of(new TestPlugin("p", new ArrayList<>()))); + + var result = runner.wrapChildContextFn(operationInfo(), () -> 42); + + assertEquals(42, result); + } + + @Test + void wrapOperationAttemptFn_executesFunction() { + var runner = new PluginRunner(List.of(new TestPlugin("p", new ArrayList<>()))); + + var result = runner.wrapOperationAttemptFn(attemptInfo(), () -> "attempt-result"); + + assertEquals("attempt-result", result); + } + + @Test + void wrapHook_userFunctionError_notSwallowedByPlugin() { + // Plugin that catches and swallows exceptions + var swallowingPlugin = new DurableInstrumentationPlugin() { + @Override + public T wrapInvocation(InvocationInfo info, Supplier fn) { + try { + return fn.get(); + } catch (Exception e) { + return null; // swallow + } + } + }; + var runner = new PluginRunner(List.of(swallowingPlugin)); + + // Even though the plugin swallows, the PluginRunner re-throws the user error + var ex = assertThrows( + RuntimeException.class, + () -> runner.wrapInvocation(invocationInfo(), () -> { + throw new RuntimeException("user error"); + })); + + assertEquals("user error", ex.getMessage()); + } + + // ─── enrichLogContext ───────────────────────────────────────────────── + + @Test + void enrichLogContext_mergesFromAllPlugins() { + var plugin1 = new DurableInstrumentationPlugin() { + @Override + public Map enrichLogContext() { + return Map.of("traceId", "abc123", "spanId", "def456"); + } + }; + var plugin2 = new DurableInstrumentationPlugin() { + @Override + public Map enrichLogContext() { + return Map.of("customKey", "customValue"); + } + }; + var runner = new PluginRunner(List.of(plugin1, plugin2)); + + var context = runner.enrichLogContext(); + + assertEquals(3, context.size()); + assertEquals("abc123", context.get("traceId")); + assertEquals("def456", context.get("spanId")); + assertEquals("customValue", context.get("customKey")); + } + + @Test + void enrichLogContext_laterPluginOverridesEarlier() { + var plugin1 = new DurableInstrumentationPlugin() { + @Override + public Map enrichLogContext() { + return Map.of("key", "first"); + } + }; + var plugin2 = new DurableInstrumentationPlugin() { + @Override + public Map enrichLogContext() { + return Map.of("key", "second"); + } + }; + var runner = new PluginRunner(List.of(plugin1, plugin2)); + + var context = runner.enrichLogContext(); + + assertEquals("second", context.get("key")); + } + + @Test + void enrichLogContext_swallowsExceptions() { + var throwingPlugin = new DurableInstrumentationPlugin() { + @Override + public Map enrichLogContext() { + throw new RuntimeException("boom"); + } + }; + var normalPlugin = new DurableInstrumentationPlugin() { + @Override + public Map enrichLogContext() { + return Map.of("key", "value"); + } + }; + var runner = new PluginRunner(List.of(throwingPlugin, normalPlugin)); + + var context = runner.enrichLogContext(); + + assertEquals(Map.of("key", "value"), context); + } + + @Test + void enrichLogContext_handlesNullReturn() { + var nullPlugin = new DurableInstrumentationPlugin() { + @Override + public Map enrichLogContext() { + return null; + } + }; + var normalPlugin = new DurableInstrumentationPlugin() { + @Override + public Map enrichLogContext() { + return Map.of("key", "value"); + } + }; + var runner = new PluginRunner(List.of(nullPlugin, normalPlugin)); + + var context = runner.enrichLogContext(); + + assertEquals(Map.of("key", "value"), context); + } + + // ─── Thread safety (basic) ─────────────────────────────────────────── + + @Test + void pluginRunner_isImmutable() { + var calls = new ArrayList(); + var mutableList = new ArrayList(); + mutableList.add(new TestPlugin("p1", calls)); + var runner = new PluginRunner(mutableList); + + // Modifying the original list should not affect the runner + mutableList.add(new TestPlugin("p2", calls)); + + runner.onExecutionStart(invocationInfo()); + + // Only p1 should be called — p2 was added after construction + assertEquals(List.of("p1:onExecutionStart"), calls); + } + + // ─── Helper methods ────────────────────────────────────────────────── + + private static InvocationInfo invocationInfo() { + return new InvocationInfo("req-123", "arn:aws:lambda:us-east-1:123456789012:function:test"); + } + + private static OperationInfo operationInfo() { + return new OperationInfo("op-1", "test-step", "STEP", null, null, Instant.now(), null); + } + + private static OperationEndInfo operationEndInfo() { + return new OperationEndInfo("op-1", "test-step", "STEP", null, null, Instant.now(), Instant.now(), null); + } + + private static AttemptInfo attemptInfo() { + return new AttemptInfo("op-1", "test-step", "STEP", null, null, Instant.now(), 1); + } + + private static AttemptEndInfo attemptEndInfo() { + return new AttemptEndInfo( + "op-1", + "test-step", + "STEP", + null, + null, + Instant.now(), + Instant.now(), + 1, + AttemptOutcome.SUCCEEDED, + null, + null); + } + + private static OperationChangeInfo operationChangeInfo() { + return new OperationChangeInfo("req-123", "arn:test", Map.of(), Map.of()); + } + + private static ExecutionEndInfo executionEndInfo() { + return new ExecutionEndInfo("req-123", "arn:test", ExecutionStatus.SUCCEEDED, null, null, null, Map.of()); + } + + // ─── Test plugin implementations ───────────────────────────────────── + + /** Plugin that records which hooks were called. */ + private static class TestPlugin implements DurableInstrumentationPlugin { + private final String name; + private final List calls; + + TestPlugin(String name, List calls) { + this.name = name; + this.calls = calls; + } + + @Override + public void onExecutionStart(InvocationInfo info) { + calls.add(name + ":onExecutionStart"); + } + + @Override + public void onExecutionEnd(ExecutionEndInfo info) { + calls.add(name + ":onExecutionEnd"); + } + + @Override + public void onInvocationStart(InvocationInfo info) { + calls.add(name + ":onInvocationStart"); + } + + @Override + public void onInvocationEnd(InvocationInfo info) { + calls.add(name + ":onInvocationEnd"); + } + + @Override + public void onOperationFirstStart(OperationInfo info) { + calls.add(name + ":onOperationFirstStart"); + } + + @Override + public void onOperationStart(OperationInfo info) { + calls.add(name + ":onOperationStart"); + } + + @Override + public void onOperationFirstEnd(OperationEndInfo info) { + calls.add(name + ":onOperationFirstEnd"); + } + + @Override + public void onOperationAttemptStart(AttemptInfo info) { + calls.add(name + ":onOperationAttemptStart"); + } + + @Override + public void onOperationAttemptEnd(AttemptEndInfo info) { + calls.add(name + ":onOperationAttemptEnd"); + } + + @Override + public void onOperationChange(OperationChangeInfo info) { + calls.add(name + ":onOperationChange"); + } + } + + /** Plugin that throws on every hook. */ + private static class ThrowingPlugin implements DurableInstrumentationPlugin { + @Override + public void onExecutionStart(InvocationInfo info) { + throw new RuntimeException("boom"); + } + + @Override + public void onInvocationStart(InvocationInfo info) { + throw new RuntimeException("boom"); + } + + @Override + public void onInvocationEnd(InvocationInfo info) { + throw new RuntimeException("boom"); + } + + @Override + public void onExecutionEnd(ExecutionEndInfo info) { + throw new RuntimeException("boom"); + } + } + + /** Plugin that tracks wrap hook ordering. */ + private static class WrapTrackingPlugin implements DurableInstrumentationPlugin { + private final String name; + private final List order; + + WrapTrackingPlugin(String name, List order) { + this.name = name; + this.order = order; + } + + @Override + public T wrapInvocation(InvocationInfo info, Supplier fn) { + order.add(name + ":before"); + var result = fn.get(); + order.add(name + ":after"); + return result; + } + } + + /** Plugin whose wrap hook throws. */ + private static class ThrowingWrapPlugin implements DurableInstrumentationPlugin { + @Override + public T wrapInvocation(InvocationInfo info, Supplier fn) { + throw new RuntimeException("wrap boom"); + } + } +}