From 08ff785cce95be1a571e54d28f736b49f14c8d45 Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Tue, 2 Jun 2026 16:22:51 -0500 Subject: [PATCH 1/3] Lambda worker support for Java SDK --- settings.gradle | 2 ++ temporal-bom/build.gradle | 1 + 2 files changed, 3 insertions(+) diff --git a/settings.gradle b/settings.gradle index fe80370b0c..013f86cbe9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -9,6 +9,8 @@ project(':temporal-opentracing').projectDir = file('contrib/temporal-opentracing include 'temporal-kotlin' include 'temporal-spring-ai' project(':temporal-spring-ai').projectDir = file('contrib/temporal-spring-ai') +include 'temporal-aws-lambda' +project(':temporal-aws-lambda').projectDir = file('contrib/temporal-aws-lambda') include 'temporal-spring-boot-autoconfigure' include 'temporal-spring-boot-starter' include 'temporal-remote-data-encoder' diff --git a/temporal-bom/build.gradle b/temporal-bom/build.gradle index e73d0d300e..0f4bf68eda 100644 --- a/temporal-bom/build.gradle +++ b/temporal-bom/build.gradle @@ -8,6 +8,7 @@ dependencies { constraints { api project(':temporal-kotlin') api project(':temporal-opentracing') + api project(':temporal-aws-lambda') api project(':temporal-remote-data-encoder') api project(':temporal-sdk') api project(':temporal-serviceclient') From d982c326fb8d5fc17105a048f368d6e00989db4f Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Thu, 4 Jun 2026 14:58:57 -0500 Subject: [PATCH 2/3] AWS Lambda integration --- contrib/temporal-aws-lambda/README.md | 56 ++ contrib/temporal-aws-lambda/build.gradle | 31 ++ .../lambda/DefaultLambdaWorkerRuntime.java | 127 +++++ .../io/temporal/aws/lambda/LambdaWorker.java | 226 ++++++++ .../aws/lambda/LambdaWorkerOptions.java | 497 ++++++++++++++++++ .../aws/lambda/LambdaWorkerRuntime.java | 28 + .../aws/lambda/OpenTelemetryFlushHook.java | 70 +++ .../lambda/OpenTelemetryStatsReporter.java | 157 ++++++ .../temporal/aws/lambda/OtelLambdaWorker.java | 335 ++++++++++++ .../temporal/aws/lambda/WorkerRegistrar.java | 27 + .../aws/lambda/LambdaWorkerLifecycleTest.java | 384 ++++++++++++++ .../aws/lambda/LambdaWorkerOptionsTest.java | 243 +++++++++ .../aws/lambda/OtelLambdaWorkerTest.java | 399 ++++++++++++++ .../aws/lambda/TestLambdaContext.java | 86 +++ 14 files changed, 2666 insertions(+) create mode 100644 contrib/temporal-aws-lambda/README.md create mode 100644 contrib/temporal-aws-lambda/build.gradle create mode 100644 contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/DefaultLambdaWorkerRuntime.java create mode 100644 contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorker.java create mode 100644 contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerOptions.java create mode 100644 contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerRuntime.java create mode 100644 contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OpenTelemetryFlushHook.java create mode 100644 contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OpenTelemetryStatsReporter.java create mode 100644 contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OtelLambdaWorker.java create mode 100644 contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/WorkerRegistrar.java create mode 100644 contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerLifecycleTest.java create mode 100644 contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerOptionsTest.java create mode 100644 contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/OtelLambdaWorkerTest.java create mode 100644 contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/TestLambdaContext.java diff --git a/contrib/temporal-aws-lambda/README.md b/contrib/temporal-aws-lambda/README.md new file mode 100644 index 0000000000..905dd63632 --- /dev/null +++ b/contrib/temporal-aws-lambda/README.md @@ -0,0 +1,56 @@ +# Temporal AWS Lambda worker module + +This module provides a direct AWS Lambda Java handler for running a Temporal worker for one Lambda invocation. + +## Usage + +Add `temporal-aws-lambda` next to your Temporal SDK dependency, then expose the returned handler from your Lambda class: + +```java +import com.amazonaws.services.lambda.runtime.RequestHandler; +import io.temporal.aws.lambda.LambdaWorker; +import io.temporal.common.WorkerDeploymentVersion; + +public final class Handler { + public static final RequestHandler HANDLER = + LambdaWorker.run( + new WorkerDeploymentVersion("orders-worker", "2026-06-02"), + options -> + options + .setTaskQueue("orders") + .registerWorkflowImplementationTypes(OrderWorkflowImpl.class) + .registerActivitiesImplementations(new OrderActivitiesImpl())); +} +``` + +`TEMPORAL_TASK_QUEUE` can provide the task queue. If it is not set, call `setTaskQueue`. + +Connection options are loaded with `temporal-envconfig` when the handler is constructed during Lambda cold start. The Lambda worker checks `TEMPORAL_CONFIG_FILE` first, then readable `$LAMBDA_TASK_ROOT/temporal.toml`, then readable `./temporal.toml`, then falls back to the envconfig defaults and Temporal environment variables. The `configure` callback also runs during handler construction, so non-invocation configuration is prepared once and reused. + +If you need to assemble options outside the `run` callback, call `LambdaWorkerOptions.fromEnvironment()`, mutate the returned options, and pass them to `LambdaWorker.newHandler(...)`. + +The handler creates one worker per invocation, starts the worker, shuts it down before the Lambda deadline, runs shutdown hooks in order, and closes service stubs. Worker deployment versioning is always enabled for the supplied `WorkerDeploymentVersion`. If neither client nor worker identity is set by the user, each invocation uses `@` as the Temporal identity. + +`shutdownDeadlineBuffer` is the full shutdown window reserved at the end of the Lambda invocation. The default is 7 seconds: 5 seconds for `gracefulShutdownTimeout` and 2 seconds for hooks and service stubs. The worker runs until `remainingTime - shutdownDeadlineBuffer`, then stops and awaits termination for `gracefulShutdownTimeout`. If you change `gracefulShutdownTimeout` without explicitly setting `shutdownDeadlineBuffer`, the buffer is recomputed as `gracefulShutdownTimeout + 2s`. + +## OpenTelemetry + +`OtelLambdaWorker.configure(options)` creates an OpenTelemetry SDK with OTLP metric and trace exporters by default, uses AWS X-Ray-compatible trace ID generation, installs an OpenTelemetry-backed Tally metrics scope, configures tracing through the SDK OpenTracing interceptor path, and registers a per-invocation flush hook. + +```java +public static final RequestHandler HANDLER = + LambdaWorker.run( + new WorkerDeploymentVersion("orders-worker", "2026-06-02"), + options -> { + OtelLambdaWorker.configure(options); + options + .setTaskQueue("orders") + .registerWorkflowImplementationTypes(OrderWorkflowImpl.class); + }); +``` + +The helper defaults the OTLP endpoint from `OTEL_EXPORTER_OTLP_ENDPOINT`, then `http://localhost:4317`. It defaults the service name from `OTEL_SERVICE_NAME`, then `AWS_LAMBDA_FUNCTION_NAME`, then `temporal-lambda-worker`, and sets it on the OpenTelemetry resource. To use an application-owned provider, call `builder.setOpenTelemetry(...)`; in that path, no exporters are created and the helper only installs the metrics scope, interceptors, and per-invocation flush hook. Providers and scopes are not closed after each invocation. + +Use `OtelLambdaWorker.configureMetrics(...)`, `OtelLambdaWorker.configureTracing(...)`, and `OtelLambdaWorker.configureFlushHook(...)` when you want to compose metrics, tracing, or provider flushing separately around an application-owned OpenTelemetry instance. + +For Java logging, this module depends on `slf4j-api` only. It does not bundle a runtime logging binding, so Lambda log formatting remains owned by the application. diff --git a/contrib/temporal-aws-lambda/build.gradle b/contrib/temporal-aws-lambda/build.gradle new file mode 100644 index 0000000000..7a3f685f3b --- /dev/null +++ b/contrib/temporal-aws-lambda/build.gradle @@ -0,0 +1,31 @@ +description = '''Temporal Java SDK AWS Lambda Worker Support Module''' + +ext { + awsLambdaJavaCoreVersion = '1.4.0' + otelVersion = '1.25.0' + otShimVersion = "${otelVersion}-alpha" +} + +dependencies { + api platform("io.opentelemetry:opentelemetry-bom:$otelVersion") + + // This module shouldn't carry temporal-sdk with it, especially for situations when users may + // be using a shaded artifact. + compileOnly project(':temporal-sdk') + + api "com.amazonaws:aws-lambda-java-core:$awsLambdaJavaCoreVersion" + + implementation project(':temporal-envconfig') + implementation project(':temporal-opentracing') + implementation "io.opentelemetry:opentelemetry-api" + implementation "io.opentelemetry.contrib:opentelemetry-aws-xray:$otelVersion" + implementation "io.opentelemetry:opentelemetry-exporter-otlp" + implementation "io.opentelemetry:opentelemetry-opentracing-shim:$otShimVersion" + implementation "io.opentelemetry:opentelemetry-sdk" + implementation "org.slf4j:slf4j-api:$slf4jVersion" + + testImplementation project(':temporal-sdk') + testImplementation "junit:junit:${junitVersion}" + + testRuntimeOnly group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}" +} diff --git a/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/DefaultLambdaWorkerRuntime.java b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/DefaultLambdaWorkerRuntime.java new file mode 100644 index 0000000000..1855fff432 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/DefaultLambdaWorkerRuntime.java @@ -0,0 +1,127 @@ +package io.temporal.aws.lambda; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.common.converter.EncodedValues; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.worker.WorkerOptions; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.Functions; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +final class DefaultLambdaWorkerRuntime implements LambdaWorkerRuntime { + @Override + public Invocation create( + WorkflowServiceStubsOptions serviceStubsOptions, + WorkflowClientOptions clientOptions, + WorkerFactoryOptions workerFactoryOptions, + String taskQueue, + WorkerOptions workerOptions) { + WorkflowServiceStubs stubs = WorkflowServiceStubs.newServiceStubs(serviceStubsOptions); + try { + WorkflowClient client = WorkflowClient.newInstance(stubs, clientOptions); + WorkerFactory factory = WorkerFactory.newInstance(client, workerFactoryOptions); + Worker worker = factory.newWorker(taskQueue, workerOptions); + return new DefaultInvocation(stubs, factory, worker); + } catch (RuntimeException e) { + stubs.shutdownNow(); + throw e; + } + } + + private static final class DefaultInvocation implements Invocation { + private final WorkflowServiceStubs stubs; + private final WorkerFactory factory; + private final WorkerRegistrar registrar; + + private DefaultInvocation(WorkflowServiceStubs stubs, WorkerFactory factory, Worker worker) { + this.stubs = stubs; + this.factory = factory; + this.registrar = new DefaultWorkerRegistrar(worker); + } + + @Override + public WorkerRegistrar getWorkerRegistrar() { + return registrar; + } + + @Override + public void start() { + factory.start(); + } + + @Override + public void shutdown() { + factory.shutdown(); + } + + @Override + public void awaitTermination(Duration timeout) { + factory.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void closeStubs(Duration timeout) { + stubs.shutdown(); + if (!stubs.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + stubs.shutdownNow(); + } + } + } + + private static final class DefaultWorkerRegistrar implements WorkerRegistrar { + private final Worker worker; + + private DefaultWorkerRegistrar(Worker worker) { + this.worker = worker; + } + + @Override + public void registerWorkflowImplementationTypes(Class... workflowImplementationClasses) { + worker.registerWorkflowImplementationTypes(workflowImplementationClasses); + } + + @Override + public void registerWorkflowImplementationTypes( + WorkflowImplementationOptions options, Class... workflowImplementationClasses) { + worker.registerWorkflowImplementationTypes(options, workflowImplementationClasses); + } + + @Override + public void registerWorkflowImplementationFactory( + Class workflowInterface, Functions.Func factory) { + worker.registerWorkflowImplementationFactory(workflowInterface, factory); + } + + @Override + public void registerWorkflowImplementationFactory( + Class workflowInterface, + Functions.Func1 factory, + WorkflowImplementationOptions options) { + worker.registerWorkflowImplementationFactory(workflowInterface, factory, options); + } + + @Override + public void registerWorkflowImplementationFactory( + Class workflowInterface, + Functions.Func factory, + WorkflowImplementationOptions options) { + worker.registerWorkflowImplementationFactory(workflowInterface, factory, options); + } + + @Override + public void registerActivitiesImplementations(Object... activityImplementations) { + worker.registerActivitiesImplementations(activityImplementations); + } + + @Override + public void registerNexusServiceImplementation(Object... nexusServiceImplementations) { + worker.registerNexusServiceImplementation(nexusServiceImplementations); + } + } +} diff --git a/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorker.java b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorker.java new file mode 100644 index 0000000000..1224783f84 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorker.java @@ -0,0 +1,226 @@ +package io.temporal.aws.lambda; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import io.temporal.common.WorkerDeploymentVersion; +import java.io.IOException; +import java.time.Duration; +import java.util.Objects; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Creates AWS Lambda handlers that run one Temporal worker per Lambda invocation. */ +public final class LambdaWorker { + private static final Logger log = LoggerFactory.getLogger(LambdaWorker.class); + + private static final Duration MINIMUM_AVAILABLE_RUNTIME = Duration.ofSeconds(1); + private static final Duration LOW_AVAILABLE_RUNTIME_WARNING = Duration.ofSeconds(5); + + private LambdaWorker() {} + + /** + * Returns an AWS Lambda Java handler that creates, starts, and shuts down one Temporal worker per + * invocation. + * + * @param version worker deployment version to advertise for this worker. + * @param configure callback invoked once while the Lambda handler is constructed. + */ + public static RequestHandler run( + WorkerDeploymentVersion version, Consumer configure) { + LambdaWorkerOptions.validateVersion(version); + Objects.requireNonNull(configure, "configure"); + try { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(System.getenv()); + configure.accept(options); + return newHandler(version, options); + } catch (IOException e) { + throw new RuntimeException("Unable to load Temporal client configuration", e); + } + } + + /** Returns an AWS Lambda Java handler using already-configured Lambda worker options. */ + public static RequestHandler newHandler( + WorkerDeploymentVersion version, LambdaWorkerOptions options) { + return newHandler(version, options, new DefaultLambdaWorkerRuntime(), sleep()); + } + + static RequestHandler newHandler( + WorkerDeploymentVersion version, + LambdaWorkerOptions options, + LambdaWorkerRuntime runtime, + Sleeper sleeper) { + return new Handler( + Objects.requireNonNull(options, "options").prepare(version), + Objects.requireNonNull(runtime, "runtime"), + Objects.requireNonNull(sleeper, "sleeper")); + } + + private static Sleeper sleep() { + return duration -> Thread.sleep(duration.toMillis()); + } + + interface Sleeper { + void sleep(Duration duration) throws InterruptedException; + } + + private static final class Handler implements RequestHandler { + private final LambdaWorkerOptions.Prepared preparedOptions; + private final LambdaWorkerRuntime runtime; + private final Sleeper sleeper; + + private Handler( + LambdaWorkerOptions.Prepared preparedOptions, + LambdaWorkerRuntime runtime, + Sleeper sleeper) { + this.preparedOptions = Objects.requireNonNull(preparedOptions, "preparedOptions"); + this.runtime = runtime; + this.sleeper = sleeper; + } + + @Override + public Void handleRequest(Object input, Context context) { + Objects.requireNonNull(context, "context"); + + LambdaWorkerOptions.Materialized options = preparedOptions.materialize(identityFor(context)); + validateRemainingTime(context, options.shutdownDeadlineBuffer); + + LambdaWorkerRuntime.Invocation invocation = null; + try { + invocation = + runtime.create( + options.serviceStubsOptions, + options.clientOptions, + options.workerFactoryOptions, + options.taskQueue, + options.workerOptions); + + for (LambdaWorkerOptions.Registration registration : options.registrations) { + registration.apply(invocation.getWorkerRegistrar()); + } + + invocation.start(); + log.info( + "Temporal Lambda worker started awsRequestId={} invokedFunctionArn={} taskQueue={} identity={}", + context.getAwsRequestId(), + context.getInvokedFunctionArn(), + options.taskQueue, + options.workerOptions.getIdentity()); + + sleepUntilShutdownWindow(context, options); + return null; + } finally { + shutdownInvocation(context, invocation, options); + } + } + + private void sleepUntilShutdownWindow( + Context context, LambdaWorkerOptions.Materialized options) { + Duration runDuration = durationUntilShutdownWindow(context, options); + if (runDuration.isZero() || runDuration.isNegative()) { + return; + } + + try { + sleeper.sleep(runDuration); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while running Temporal Lambda worker", e); + } + } + + private void shutdownInvocation( + Context context, + LambdaWorkerRuntime.Invocation invocation, + LambdaWorkerOptions.Materialized options) { + if (invocation != null) { + try { + invocation.shutdown(); + invocation.awaitTermination(options.gracefulShutdownTimeout); + log.info( + "Temporal Lambda worker stopped awsRequestId={} invokedFunctionArn={} taskQueue={}", + context.getAwsRequestId(), + context.getInvokedFunctionArn(), + options.taskQueue); + } catch (RuntimeException e) { + log.error( + "Temporal Lambda worker shutdown failed awsRequestId={} invokedFunctionArn={} taskQueue={}", + context.getAwsRequestId(), + context.getInvokedFunctionArn(), + options.taskQueue, + e); + } + } + + runShutdownHooks(context, options); + + if (invocation != null) { + try { + invocation.closeStubs(stubsCloseTimeout(options)); + } catch (RuntimeException e) { + log.error( + "Temporal Lambda worker service stubs close failed awsRequestId={} invokedFunctionArn={} taskQueue={}", + context.getAwsRequestId(), + context.getInvokedFunctionArn(), + options.taskQueue, + e); + } + } + } + + private void runShutdownHooks(Context context, LambdaWorkerOptions.Materialized options) { + for (Runnable hook : options.shutdownHooks) { + try { + hook.run(); + } catch (RuntimeException e) { + log.error( + "Temporal Lambda worker shutdown hook failed awsRequestId={} invokedFunctionArn={} taskQueue={}", + context.getAwsRequestId(), + context.getInvokedFunctionArn(), + options.taskQueue, + e); + } + } + } + + private Duration durationUntilShutdownWindow( + Context context, LambdaWorkerOptions.Materialized options) { + return Duration.ofMillis(context.getRemainingTimeInMillis()) + .minus(options.shutdownDeadlineBuffer); + } + + private Duration stubsCloseTimeout(LambdaWorkerOptions.Materialized options) { + Duration timeout = options.shutdownDeadlineBuffer.minus(options.gracefulShutdownTimeout); + return timeout.isNegative() ? Duration.ZERO : timeout; + } + + private void validateRemainingTime(Context context, Duration shutdownDeadlineBuffer) { + Duration available = + Duration.ofMillis(context.getRemainingTimeInMillis()).minus(shutdownDeadlineBuffer); + if (available.compareTo(MINIMUM_AVAILABLE_RUNTIME) <= 0) { + throw new IllegalStateException( + "Insufficient Lambda invocation time remaining after shutdown buffer: " + + available.toMillis() + + "ms"); + } + if (available.compareTo(LOW_AVAILABLE_RUNTIME_WARNING) < 0) { + log.warn( + "Temporal Lambda worker has low remaining time awsRequestId={} invokedFunctionArn={} availableRuntimeMs={} shutdownDeadlineBufferMs={}", + context.getAwsRequestId(), + context.getInvokedFunctionArn(), + available.toMillis(), + shutdownDeadlineBuffer.toMillis()); + } + } + + private static String identityFor(Context context) { + return emptyToUnknown(context.getAwsRequestId()) + + "@" + + emptyToUnknown(context.getInvokedFunctionArn()); + } + + private static String emptyToUnknown(String value) { + return value == null || value.isEmpty() ? "unknown" : value; + } + } +} diff --git a/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerOptions.java b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerOptions.java new file mode 100644 index 0000000000..8e1d019e77 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerOptions.java @@ -0,0 +1,497 @@ +package io.temporal.aws.lambda; + +import io.temporal.client.WorkflowClientOptions; +import io.temporal.common.VersioningBehavior; +import io.temporal.common.WorkerDeploymentVersion; +import io.temporal.common.converter.EncodedValues; +import io.temporal.envconfig.ClientConfigProfile; +import io.temporal.envconfig.LoadClientConfigProfileOptions; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.worker.WorkerDeploymentOptions; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.worker.WorkerOptions; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.Functions; +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Configuration for Temporal workers running inside AWS Lambda invocations. + * + *

Instances are configured during handler construction and then copied for each invocation so + * invocation identity can be added without rerunning user configuration. + */ +public final class LambdaWorkerOptions { + public static final String TEMPORAL_TASK_QUEUE = "TEMPORAL_TASK_QUEUE"; + public static final String TEMPORAL_CONFIG_FILE = "TEMPORAL_CONFIG_FILE"; + public static final String LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT"; + + static final Duration DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(5); + static final Duration DEFAULT_SHUTDOWN_HOOKS_AND_STUBS_TIMEOUT = Duration.ofSeconds(2); + static final Duration DEFAULT_SHUTDOWN_DEADLINE_BUFFER = + DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT.plus(DEFAULT_SHUTDOWN_HOOKS_AND_STUBS_TIMEOUT); + + private static final int DEFAULT_MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE = 2; + private static final int DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE = 10; + private static final int DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE = 2; + private static final int DEFAULT_MAX_CONCURRENT_NEXUS_EXECUTION_SIZE = 5; + private static final int DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_POLLERS = 2; + private static final int DEFAULT_MAX_CONCURRENT_ACTIVITY_TASK_POLLERS = 1; + private static final int DEFAULT_MAX_CONCURRENT_NEXUS_TASK_POLLERS = 1; + private static final int DEFAULT_WORKFLOW_CACHE_SIZE = 30; + private static final int DEFAULT_MAX_WORKFLOW_THREAD_COUNT = 30; + + private final WorkflowServiceStubsOptions.Builder workflowServiceStubsOptionsBuilder; + private final WorkflowClientOptions.Builder workflowClientOptionsBuilder; + private final WorkerFactoryOptions.Builder workerFactoryOptionsBuilder; + private final WorkerOptions.Builder workerOptionsBuilder; + private final List registrations = new ArrayList<>(); + private final List shutdownHooks = new ArrayList<>(); + + private String taskQueue; + private Duration gracefulShutdownTimeout = DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT; + private Duration shutdownDeadlineBuffer = DEFAULT_SHUTDOWN_DEADLINE_BUFFER; + private boolean shutdownDeadlineBufferExplicit; + + private LambdaWorkerOptions(ClientConfigProfile profile, Map env) { + this.workflowServiceStubsOptionsBuilder = + WorkflowServiceStubsOptions.newBuilder(profile.toWorkflowServiceStubsOptions()); + this.workflowClientOptionsBuilder = + WorkflowClientOptions.newBuilder(profile.toWorkflowClientOptions()); + this.workerFactoryOptionsBuilder = WorkerFactoryOptions.newBuilder(); + this.workerOptionsBuilder = WorkerOptions.newBuilder(); + this.taskQueue = nonEmptyEnv(env, TEMPORAL_TASK_QUEUE); + } + + /** Loads Temporal client configuration from the process environment. */ + public static LambdaWorkerOptions fromEnvironment() throws IOException { + return fromEnvironment(System.getenv()); + } + + /** Loads Temporal client configuration from the provided environment values. */ + public static LambdaWorkerOptions fromEnvironment(Map env) throws IOException { + return fromEnvironment(env, new File(".")); + } + + static LambdaWorkerOptions fromEnvironment(Map env, File cwd) throws IOException { + ClientConfigProfile profile = + ClientConfigProfile.load( + LoadClientConfigProfileOptions.newBuilder() + .setConfigFilePath(resolveConfigFilePath(env, cwd)) + .setEnvOverrides(env) + .build()); + return new LambdaWorkerOptions(profile, env); + } + + static String resolveConfigFilePath(Map env) { + return resolveConfigFilePath(env, new File(".")); + } + + static String resolveConfigFilePath(Map env, File cwd) { + String configured = nonEmptyEnv(env, TEMPORAL_CONFIG_FILE); + if (configured != null) { + return configured; + } + + String taskRoot = nonEmptyEnv(env, LAMBDA_TASK_ROOT); + if (taskRoot != null) { + File lambdaConfig = new File(taskRoot, "temporal.toml"); + if (isReadableFile(lambdaConfig)) { + return lambdaConfig.getAbsolutePath(); + } + } + + File cwdConfig = new File(Objects.requireNonNull(cwd, "cwd"), "temporal.toml"); + return isReadableFile(cwdConfig) ? cwdConfig.getAbsolutePath() : null; + } + + /** Returns the builder used to prepare {@link WorkflowServiceStubsOptions}. */ + public WorkflowServiceStubsOptions.Builder getWorkflowServiceStubsOptionsBuilder() { + return workflowServiceStubsOptionsBuilder; + } + + /** Returns the builder used to prepare {@link WorkflowClientOptions}. */ + public WorkflowClientOptions.Builder getWorkflowClientOptionsBuilder() { + return workflowClientOptionsBuilder; + } + + /** Returns the builder used to prepare {@link WorkerFactoryOptions}. */ + public WorkerFactoryOptions.Builder getWorkerFactoryOptionsBuilder() { + return workerFactoryOptionsBuilder; + } + + /** Returns the builder used to prepare {@link WorkerOptions}. */ + public WorkerOptions.Builder getWorkerOptionsBuilder() { + return workerOptionsBuilder; + } + + public String getTaskQueue() { + return taskQueue; + } + + /** Sets the Temporal task queue polled by the per-invocation worker. */ + public LambdaWorkerOptions setTaskQueue(String taskQueue) { + this.taskQueue = taskQueue; + return this; + } + + public Duration getGracefulShutdownTimeout() { + return gracefulShutdownTimeout; + } + + /** + * Sets how long worker shutdown waits for pollers and executions to stop. + * + *

If {@link #setShutdownDeadlineBuffer(Duration)} has not been called, the shutdown deadline + * buffer is recomputed as this timeout plus a 2 second hook and service stubs margin. + */ + public LambdaWorkerOptions setGracefulShutdownTimeout(Duration gracefulShutdownTimeout) { + this.gracefulShutdownTimeout = + requireNonNegative(gracefulShutdownTimeout, "gracefulShutdownTimeout"); + if (!shutdownDeadlineBufferExplicit) { + shutdownDeadlineBuffer = + this.gracefulShutdownTimeout.plus(DEFAULT_SHUTDOWN_HOOKS_AND_STUBS_TIMEOUT); + } + return this; + } + + public Duration getShutdownDeadlineBuffer() { + return shutdownDeadlineBuffer; + } + + /** + * Sets the full shutdown window reserved at the end of the Lambda invocation. + * + *

The worker stops when remaining invocation time reaches this buffer. The default is 7 + * seconds, made up of the 5 second graceful shutdown timeout and a 2 second hook and service + * stubs margin. + */ + public LambdaWorkerOptions setShutdownDeadlineBuffer(Duration shutdownDeadlineBuffer) { + this.shutdownDeadlineBuffer = + requireNonNegative(shutdownDeadlineBuffer, "shutdownDeadlineBuffer"); + shutdownDeadlineBufferExplicit = true; + return this; + } + + public LambdaWorkerOptions registerWorkflowImplementationTypes( + Class... workflowImplementationClasses) { + final Class[] classes = copyClasses(workflowImplementationClasses); + registrations.add(registrar -> registrar.registerWorkflowImplementationTypes(classes)); + return this; + } + + public LambdaWorkerOptions registerWorkflowImplementationTypes( + WorkflowImplementationOptions options, Class... workflowImplementationClasses) { + Objects.requireNonNull(options, "options"); + final Class[] classes = copyClasses(workflowImplementationClasses); + registrations.add(registrar -> registrar.registerWorkflowImplementationTypes(options, classes)); + return this; + } + + public LambdaWorkerOptions registerWorkflowImplementationFactory( + Class workflowInterface, Functions.Func factory) { + Objects.requireNonNull(workflowInterface, "workflowInterface"); + Objects.requireNonNull(factory, "factory"); + registrations.add( + registrar -> registrar.registerWorkflowImplementationFactory(workflowInterface, factory)); + return this; + } + + public LambdaWorkerOptions registerWorkflowImplementationFactory( + Class workflowInterface, + Functions.Func factory, + WorkflowImplementationOptions options) { + Objects.requireNonNull(workflowInterface, "workflowInterface"); + Objects.requireNonNull(factory, "factory"); + Objects.requireNonNull(options, "options"); + registrations.add( + registrar -> + registrar.registerWorkflowImplementationFactory(workflowInterface, factory, options)); + return this; + } + + public LambdaWorkerOptions registerWorkflowImplementationFactory( + Class workflowInterface, + Functions.Func1 factory, + WorkflowImplementationOptions options) { + Objects.requireNonNull(workflowInterface, "workflowInterface"); + Objects.requireNonNull(factory, "factory"); + Objects.requireNonNull(options, "options"); + registrations.add( + registrar -> + registrar.registerWorkflowImplementationFactory(workflowInterface, factory, options)); + return this; + } + + public LambdaWorkerOptions registerActivitiesImplementations(Object... activityImplementations) { + final Object[] implementations = + copyObjects(activityImplementations, "activityImplementations"); + registrations.add(registrar -> registrar.registerActivitiesImplementations(implementations)); + return this; + } + + public LambdaWorkerOptions registerNexusServiceImplementation( + Object... nexusServiceImplementations) { + final Object[] implementations = + copyObjects(nexusServiceImplementations, "nexusServiceImplementations"); + registrations.add(registrar -> registrar.registerNexusServiceImplementation(implementations)); + return this; + } + + /** Adds a shutdown hook that runs after the worker has stopped and before service stubs close. */ + public LambdaWorkerOptions addShutdownHook(Runnable hook) { + shutdownHooks.add(Objects.requireNonNull(hook, "hook")); + return this; + } + + Materialized materialize(WorkerDeploymentVersion version, String invocationIdentity) { + return prepare(version).materialize(invocationIdentity); + } + + Prepared prepare(WorkerDeploymentVersion version) { + validateVersion(version); + if (isNullOrEmpty(taskQueue)) { + throw new IllegalStateException( + "Task queue must be set with LambdaWorkerOptions#setTaskQueue or TEMPORAL_TASK_QUEUE"); + } + + WorkflowClientOptions rawClientOptions = workflowClientOptionsBuilder.build(); + WorkerOptions rawWorkerOptions = workerOptionsBuilder.build(); + WorkerFactoryOptions rawFactoryOptions = workerFactoryOptionsBuilder.build(); + + WorkflowClientOptions.Builder clientOptionsBuilder = + WorkflowClientOptions.newBuilder(rawClientOptions); + WorkerOptions.Builder workerOptionsBuilder = WorkerOptions.newBuilder(rawWorkerOptions); + WorkerFactoryOptions.Builder factoryOptionsBuilder = + WorkerFactoryOptions.newBuilder(rawFactoryOptions); + + if (rawClientOptions.getIdentity() == null && rawWorkerOptions.getIdentity() != null) { + clientOptionsBuilder.setIdentity(rawWorkerOptions.getIdentity()); + } else if (rawWorkerOptions.getIdentity() == null && rawClientOptions.getIdentity() != null) { + workerOptionsBuilder.setIdentity(rawClientOptions.getIdentity()); + } + + applyLambdaWorkerDefaults(rawWorkerOptions, workerOptionsBuilder, version); + applyLambdaFactoryDefaults(rawFactoryOptions, factoryOptionsBuilder); + + return new Prepared( + workflowServiceStubsOptionsBuilder.validateAndBuildWithDefaults(), + clientOptionsBuilder.build(), + factoryOptionsBuilder.validateAndBuildWithDefaults(), + taskQueue, + workerOptionsBuilder.validateAndBuildWithDefaults(), + gracefulShutdownTimeout, + shutdownDeadlineBuffer, + new ArrayList<>(registrations), + new ArrayList<>(shutdownHooks)); + } + + private static void applyLambdaWorkerDefaults( + WorkerOptions rawOptions, WorkerOptions.Builder builder, WorkerDeploymentVersion version) { + if (rawOptions.getWorkerTuner() == null) { + if (rawOptions.getMaxConcurrentActivityExecutionSize() == 0) { + builder.setMaxConcurrentActivityExecutionSize( + DEFAULT_MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE); + } + if (rawOptions.getMaxConcurrentWorkflowTaskExecutionSize() == 0) { + builder.setMaxConcurrentWorkflowTaskExecutionSize( + DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE); + } + if (rawOptions.getMaxConcurrentLocalActivityExecutionSize() == 0) { + builder.setMaxConcurrentLocalActivityExecutionSize( + DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE); + } + if (rawOptions.getMaxConcurrentNexusExecutionSize() == 0) { + builder.setMaxConcurrentNexusExecutionSize(DEFAULT_MAX_CONCURRENT_NEXUS_EXECUTION_SIZE); + } + } + + if (rawOptions.getWorkflowTaskPollersBehavior() == null + && rawOptions.getMaxConcurrentWorkflowTaskPollers() == 0) { + builder.setMaxConcurrentWorkflowTaskPollers(DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_POLLERS); + } + if (rawOptions.getActivityTaskPollersBehavior() == null + && rawOptions.getMaxConcurrentActivityTaskPollers() == 0) { + builder.setMaxConcurrentActivityTaskPollers(DEFAULT_MAX_CONCURRENT_ACTIVITY_TASK_POLLERS); + } + if (rawOptions.getNexusTaskPollersBehavior() == null + && rawOptions.getMaxConcurrentNexusTaskPollers() == 0) { + builder.setMaxConcurrentNexusTaskPollers(DEFAULT_MAX_CONCURRENT_NEXUS_TASK_POLLERS); + } + + builder.setDisableEagerExecution(true); + builder.setDeploymentOptions( + forcedDeploymentOptions(rawOptions.getDeploymentOptions(), version)); + } + + private static void applyLambdaFactoryDefaults( + WorkerFactoryOptions rawOptions, WorkerFactoryOptions.Builder builder) { + if (rawOptions.getWorkflowCacheSize() == 0) { + builder.setWorkflowCacheSize(DEFAULT_WORKFLOW_CACHE_SIZE); + } + if (rawOptions.getMaxWorkflowThreadCount() == 0) { + builder.setMaxWorkflowThreadCount(DEFAULT_MAX_WORKFLOW_THREAD_COUNT); + } + } + + private static WorkerDeploymentOptions forcedDeploymentOptions( + WorkerDeploymentOptions existing, WorkerDeploymentVersion version) { + VersioningBehavior behavior = VersioningBehavior.PINNED; + if (existing != null + && existing.getDefaultVersioningBehavior() != VersioningBehavior.UNSPECIFIED) { + behavior = existing.getDefaultVersioningBehavior(); + } + + return WorkerDeploymentOptions.newBuilder() + .setUseVersioning(true) + .setVersion(version) + .setDefaultVersioningBehavior(behavior) + .build(); + } + + static void validateVersion(WorkerDeploymentVersion version) { + Objects.requireNonNull(version, "version"); + if (isNullOrEmpty(version.getDeploymentName())) { + throw new IllegalArgumentException("Worker deployment name must be non-empty"); + } + if (isNullOrEmpty(version.getBuildId())) { + throw new IllegalArgumentException("Worker deployment build ID must be non-empty"); + } + } + + private static Duration requireNonNegative(Duration value, String name) { + Objects.requireNonNull(value, name); + if (value.isNegative()) { + throw new IllegalArgumentException(name + " must not be negative"); + } + return value; + } + + private static boolean isReadableFile(File file) { + return file.isFile() && file.canRead(); + } + + private static String nonEmptyEnv(Map env, String name) { + if (env == null) { + return null; + } + String value = env.get(name); + return isNullOrEmpty(value) ? null : value; + } + + private static boolean isNullOrEmpty(String value) { + return value == null || value.trim().isEmpty(); + } + + private static Class[] copyClasses(Class... classes) { + Objects.requireNonNull(classes, "classes"); + for (Class workflowImplementationClass : classes) { + Objects.requireNonNull(workflowImplementationClass, "workflowImplementationClass"); + } + return Arrays.copyOf(classes, classes.length); + } + + private static Object[] copyObjects(Object[] objects, String name) { + Objects.requireNonNull(objects, name); + return Arrays.copyOf(objects, objects.length); + } + + interface Registration { + void apply(WorkerRegistrar registrar); + } + + static final class Materialized { + final WorkflowServiceStubsOptions serviceStubsOptions; + final WorkflowClientOptions clientOptions; + final WorkerFactoryOptions workerFactoryOptions; + final String taskQueue; + final WorkerOptions workerOptions; + final Duration gracefulShutdownTimeout; + final Duration shutdownDeadlineBuffer; + final List registrations; + final List shutdownHooks; + + private Materialized( + WorkflowServiceStubsOptions serviceStubsOptions, + WorkflowClientOptions clientOptions, + WorkerFactoryOptions workerFactoryOptions, + String taskQueue, + WorkerOptions workerOptions, + Duration gracefulShutdownTimeout, + Duration shutdownDeadlineBuffer, + List registrations, + List shutdownHooks) { + this.serviceStubsOptions = serviceStubsOptions; + this.clientOptions = clientOptions; + this.workerFactoryOptions = workerFactoryOptions; + this.taskQueue = taskQueue; + this.workerOptions = workerOptions; + this.gracefulShutdownTimeout = gracefulShutdownTimeout; + this.shutdownDeadlineBuffer = shutdownDeadlineBuffer; + this.registrations = Collections.unmodifiableList(registrations); + this.shutdownHooks = Collections.unmodifiableList(shutdownHooks); + } + } + + static final class Prepared { + private final WorkflowServiceStubsOptions serviceStubsOptions; + private final WorkflowClientOptions clientOptions; + private final WorkerFactoryOptions workerFactoryOptions; + private final String taskQueue; + private final WorkerOptions workerOptions; + private final Duration gracefulShutdownTimeout; + private final Duration shutdownDeadlineBuffer; + private final List registrations; + private final List shutdownHooks; + + private Prepared( + WorkflowServiceStubsOptions serviceStubsOptions, + WorkflowClientOptions clientOptions, + WorkerFactoryOptions workerFactoryOptions, + String taskQueue, + WorkerOptions workerOptions, + Duration gracefulShutdownTimeout, + Duration shutdownDeadlineBuffer, + List registrations, + List shutdownHooks) { + this.serviceStubsOptions = serviceStubsOptions; + this.clientOptions = clientOptions; + this.workerFactoryOptions = workerFactoryOptions; + this.taskQueue = taskQueue; + this.workerOptions = workerOptions; + this.gracefulShutdownTimeout = gracefulShutdownTimeout; + this.shutdownDeadlineBuffer = shutdownDeadlineBuffer; + this.registrations = Collections.unmodifiableList(registrations); + this.shutdownHooks = Collections.unmodifiableList(shutdownHooks); + } + + Materialized materialize(String invocationIdentity) { + WorkflowClientOptions.Builder clientOptionsBuilder = + WorkflowClientOptions.newBuilder(clientOptions); + WorkerOptions.Builder workerOptionsBuilder = WorkerOptions.newBuilder(workerOptions); + + if (clientOptions.getIdentity() == null && workerOptions.getIdentity() == null) { + clientOptionsBuilder.setIdentity(invocationIdentity); + workerOptionsBuilder.setIdentity(invocationIdentity); + } + + return new Materialized( + serviceStubsOptions, + clientOptionsBuilder.validateAndBuildWithDefaults(), + workerFactoryOptions, + taskQueue, + workerOptionsBuilder.validateAndBuildWithDefaults(), + gracefulShutdownTimeout, + shutdownDeadlineBuffer, + new ArrayList<>(registrations), + new ArrayList<>(shutdownHooks)); + } + } +} diff --git a/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerRuntime.java b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerRuntime.java new file mode 100644 index 0000000000..50c2f9a368 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerRuntime.java @@ -0,0 +1,28 @@ +package io.temporal.aws.lambda; + +import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.worker.WorkerOptions; +import java.time.Duration; + +interface LambdaWorkerRuntime { + Invocation create( + WorkflowServiceStubsOptions serviceStubsOptions, + WorkflowClientOptions clientOptions, + WorkerFactoryOptions workerFactoryOptions, + String taskQueue, + WorkerOptions workerOptions); + + interface Invocation { + WorkerRegistrar getWorkerRegistrar(); + + void start(); + + void shutdown(); + + void awaitTermination(Duration timeout); + + void closeStubs(Duration timeout); + } +} diff --git a/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OpenTelemetryFlushHook.java b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OpenTelemetryFlushHook.java new file mode 100644 index 0000000000..b9c718dc86 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OpenTelemetryFlushHook.java @@ -0,0 +1,70 @@ +package io.temporal.aws.lambda; + +import io.opentelemetry.api.OpenTelemetry; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class OpenTelemetryFlushHook implements Runnable { + private static final Logger log = LoggerFactory.getLogger(OpenTelemetryFlushHook.class); + + private final OpenTelemetry openTelemetry; + private final Duration timeout; + + OpenTelemetryFlushHook(OpenTelemetry openTelemetry, Duration timeout) { + this.openTelemetry = Objects.requireNonNull(openTelemetry, "openTelemetry"); + this.timeout = Objects.requireNonNull(timeout, "timeout"); + } + + @Override + public void run() { + forceFlush(openTelemetry.getTracerProvider()); + forceFlush(openTelemetry.getMeterProvider()); + } + + private void forceFlush(Object provider) { + if (provider == null) { + return; + } + + try { + Method forceFlush = provider.getClass().getMethod("forceFlush"); + Object result = forceFlush.invoke(provider); + join(result); + } catch (NoSuchMethodException e) { + // The OpenTelemetry API no-op providers do not expose forceFlush. + } catch (IllegalAccessException | InvocationTargetException | RuntimeException e) { + log.warn("OpenTelemetry forceFlush failed provider={}", provider.getClass().getName(), e); + } + } + + private void join(Object result) { + if (result == null) { + return; + } + + try { + Method join = result.getClass().getMethod("join", long.class, TimeUnit.class); + join.invoke(result, timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (NoSuchMethodException e) { + tryJoinMillis(result); + } catch (IllegalAccessException | InvocationTargetException | RuntimeException e) { + log.warn("OpenTelemetry forceFlush join failed result={}", result.getClass().getName(), e); + } + } + + private void tryJoinMillis(Object result) { + try { + Method join = result.getClass().getMethod("join", long.class); + join.invoke(result, timeout.toMillis()); + } catch (NoSuchMethodException e) { + // Some forceFlush result implementations do not expose a blocking join method. + } catch (IllegalAccessException | InvocationTargetException | RuntimeException e) { + log.warn("OpenTelemetry forceFlush join failed result={}", result.getClass().getName(), e); + } + } +} diff --git a/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OpenTelemetryStatsReporter.java b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OpenTelemetryStatsReporter.java new file mode 100644 index 0000000000..3e5e83a326 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OpenTelemetryStatsReporter.java @@ -0,0 +1,157 @@ +package io.temporal.aws.lambda; + +import com.uber.m3.tally.Capabilities; +import com.uber.m3.tally.CapableOf; +import com.uber.m3.tally.StatsReporter; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +final class OpenTelemetryStatsReporter implements StatsReporter { + private final Meter meter; + private final String serviceName; + private final ConcurrentMap counters = new ConcurrentHashMap<>(); + private final ConcurrentMap timers = new ConcurrentHashMap<>(); + private final ConcurrentMap gauges = new ConcurrentHashMap<>(); + + OpenTelemetryStatsReporter(OpenTelemetry openTelemetry, String serviceName) { + this.meter = Objects.requireNonNull(openTelemetry, "openTelemetry").getMeter("io.temporal"); + this.serviceName = Objects.requireNonNull(serviceName, "serviceName"); + } + + @Override + public Capabilities capabilities() { + return CapableOf.REPORTING; + } + + @Override + public void flush() { + // OpenTelemetry SDK flushing is handled by OtelLambdaWorker's shutdown hook. + } + + @Override + public void close() { + flush(); + } + + @Override + public void reportCounter(String name, Map tags, long value) { + LongCounter counter = counters.computeIfAbsent(name, key -> meter.counterBuilder(key).build()); + counter.add(value, attributes(tags)); + } + + @Override + public void reportGauge(String name, Map tags, double value) { + MetricKey key = new MetricKey(name, tags); + GaugeHolder holder = + gauges.computeIfAbsent( + key, + metricKey -> { + AtomicReference current = new AtomicReference<>(0.0); + ObservableDoubleGauge gauge = + meter + .gaugeBuilder(metricKey.name) + .buildWithCallback( + measurement -> + measurement.record(current.get(), attributes(metricKey.tags))); + return new GaugeHolder(current, gauge); + }); + holder.current.set(value); + } + + @Override + public void reportTimer( + String name, Map tags, com.uber.m3.util.Duration interval) { + DoubleHistogram timer = + timers.computeIfAbsent(name, key -> meter.histogramBuilder(key).setUnit("ms").build()); + timer.record(interval.getNanos() / 1_000_000.0, attributes(tags)); + } + + @Override + @SuppressWarnings("deprecation") + public void reportHistogramValueSamples( + String name, + Map tags, + com.uber.m3.tally.Buckets buckets, + double bucketLowerBound, + double bucketUpperBound, + long samples) { + // Tally reports pre-aggregated bucket samples, while the OpenTelemetry API records raw values. + } + + @Override + @SuppressWarnings("deprecation") + public void reportHistogramDurationSamples( + String name, + Map tags, + com.uber.m3.tally.Buckets buckets, + com.uber.m3.util.Duration bucketLowerBound, + com.uber.m3.util.Duration bucketUpperBound, + long samples) { + // Tally reports pre-aggregated bucket samples, while the OpenTelemetry API records raw values. + } + + private Attributes attributes(Map tags) { + AttributesBuilder builder = Attributes.builder(); + builder.put("service.name", serviceName); + if (tags != null) { + for (Map.Entry entry : tags.entrySet()) { + if (entry.getKey() != null && entry.getValue() != null) { + builder.put(entry.getKey(), entry.getValue()); + } + } + } + return builder.build(); + } + + private static final class GaugeHolder { + private final AtomicReference current; + + @SuppressWarnings("unused") + private final ObservableDoubleGauge gauge; + + private GaugeHolder(AtomicReference current, ObservableDoubleGauge gauge) { + this.current = current; + this.gauge = gauge; + } + } + + private static final class MetricKey { + private final String name; + private final Map tags; + + private MetricKey(String name, Map tags) { + this.name = Objects.requireNonNull(name, "name"); + this.tags = + tags == null ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<>(tags)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MetricKey metricKey = (MetricKey) o; + return name.equals(metricKey.name) && tags.equals(metricKey.tags); + } + + @Override + public int hashCode() { + return Objects.hash(name, tags); + } + } +} diff --git a/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OtelLambdaWorker.java b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OtelLambdaWorker.java new file mode 100644 index 0000000000..3641985499 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/OtelLambdaWorker.java @@ -0,0 +1,335 @@ +package io.temporal.aws.lambda; + +import com.uber.m3.tally.RootScopeBuilder; +import com.uber.m3.tally.Scope; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.contrib.awsxray.AwsXrayIdGenerator; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.opentracingshim.OpenTracingShim; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.common.interceptors.WorkerInterceptor; +import io.temporal.common.interceptors.WorkflowClientInterceptor; +import io.temporal.opentracing.OpenTracingClientInterceptor; +import io.temporal.opentracing.OpenTracingOptions; +import io.temporal.opentracing.OpenTracingWorkerInterceptor; +import io.temporal.worker.WorkerFactoryOptions; +import java.time.Duration; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +/** OpenTelemetry helper for {@link LambdaWorker}. */ +public final class OtelLambdaWorker { + public static final String OTEL_EXPORTER_OTLP_ENDPOINT = "OTEL_EXPORTER_OTLP_ENDPOINT"; + public static final String OTEL_SERVICE_NAME = "OTEL_SERVICE_NAME"; + public static final String AWS_LAMBDA_FUNCTION_NAME = "AWS_LAMBDA_FUNCTION_NAME"; + public static final String DEFAULT_OTLP_ENDPOINT = "http://localhost:4317"; + public static final String DEFAULT_SERVICE_NAME = "temporal-lambda-worker"; + + private static final AttributeKey SERVICE_NAME_ATTRIBUTE = + AttributeKey.stringKey("service.name"); + + private OtelLambdaWorker() {} + + public static Builder newBuilder() { + return new Builder(System.getenv()); + } + + static Builder newBuilder(Map env) { + return new Builder(env); + } + + public static String getDefaultEndpoint() { + return resolveEndpoint(System.getenv()); + } + + public static String getDefaultServiceName() { + return resolveServiceName(System.getenv()); + } + + public static void configure(LambdaWorkerOptions options) { + configure(options, builder -> {}); + } + + /** + * Configures metrics, tracing interceptors, and per-invocation flushing for a Lambda worker. + * + *

By default this method creates an OpenTelemetry SDK with OTLP trace and metric exporters and + * AWS X-Ray-compatible trace ID generation. If {@link Builder#setOpenTelemetry(OpenTelemetry)} is + * used, the provided instance is used instead and exporters are not created. + */ + public static void configure(LambdaWorkerOptions options, Consumer configure) { + Objects.requireNonNull(options, "options"); + Builder builder = newBuilder(); + Objects.requireNonNull(configure, "configure").accept(builder); + builder.apply(options); + } + + /** + * Configures Temporal metrics with the default service name and reporting interval. + * + *

This helper only installs the metrics scope. It does not configure tracing interceptors or + * register a flush hook. + */ + public static void configureMetrics(LambdaWorkerOptions options, OpenTelemetry openTelemetry) { + configureMetrics(options, openTelemetry, getDefaultServiceName(), Duration.ofSeconds(1)); + } + + /** + * Configures Temporal metrics with an application-owned OpenTelemetry provider. + * + *

This helper only installs the metrics scope. It does not configure tracing interceptors or + * register a flush hook. + */ + public static void configureMetrics( + LambdaWorkerOptions options, + OpenTelemetry openTelemetry, + String serviceName, + Duration reportInterval) { + Objects.requireNonNull(options, "options"); + OpenTelemetryStatsReporter reporter = + new OpenTelemetryStatsReporter( + Objects.requireNonNull(openTelemetry, "openTelemetry"), + Objects.requireNonNull(serviceName, "serviceName")); + Scope scope = + new RootScopeBuilder() + .reporter(reporter) + .reportEvery( + com.uber.m3.util.Duration.ofMillis( + requirePositive(reportInterval, "reportInterval").toMillis())); + options.getWorkflowServiceStubsOptionsBuilder().setMetricsScope(scope); + } + + /** + * Configures Temporal tracing interceptors with an application-owned OpenTelemetry provider. + * + *

This helper only installs tracing interceptors. It does not configure metrics or register a + * flush hook. + */ + public static void configureTracing(LambdaWorkerOptions options, OpenTelemetry openTelemetry) { + Objects.requireNonNull(options, "options"); + OpenTracingOptions tracingOptions = + OpenTracingOptions.newBuilder() + .setTracer( + OpenTracingShim.createTracerShim( + Objects.requireNonNull(openTelemetry, "openTelemetry"))) + .build(); + appendClientInterceptor(options, new OpenTracingClientInterceptor(tracingOptions)); + appendWorkerInterceptor(options, new OpenTracingWorkerInterceptor(tracingOptions)); + } + + /** + * Registers a per-invocation OpenTelemetry force-flush hook. + * + *

This helper only registers the flush hook. It does not configure metrics or tracing. + */ + public static void configureFlushHook( + LambdaWorkerOptions options, OpenTelemetry openTelemetry, Duration flushTimeout) { + Objects.requireNonNull(options, "options"); + options.addShutdownHook( + new OpenTelemetryFlushHook( + Objects.requireNonNull(openTelemetry, "openTelemetry"), + requireNonNegative(flushTimeout, "flushTimeout"))); + } + + static String resolveEndpoint(Map env) { + String endpoint = nonEmptyEnv(env, OTEL_EXPORTER_OTLP_ENDPOINT); + return endpoint == null ? DEFAULT_OTLP_ENDPOINT : endpoint; + } + + static String resolveServiceName(Map env) { + String serviceName = nonEmptyEnv(env, OTEL_SERVICE_NAME); + if (serviceName != null) { + return serviceName; + } + serviceName = nonEmptyEnv(env, AWS_LAMBDA_FUNCTION_NAME); + return serviceName == null ? DEFAULT_SERVICE_NAME : serviceName; + } + + public static final class Builder { + private final Map env; + private OpenTelemetry openTelemetry; + private String endpoint; + private String serviceName; + private Duration metricsReportInterval = Duration.ofSeconds(1); + private Duration flushTimeout = Duration.ofSeconds(10); + private Runnable flushHook; + private TelemetryFactory telemetryFactory = new DefaultTelemetryFactory(); + + private Builder(Map env) { + this.env = Objects.requireNonNull(env, "env"); + } + + /** + * Uses an application-owned OpenTelemetry instance instead of creating an SDK and exporters. + */ + public Builder setOpenTelemetry(OpenTelemetry openTelemetry) { + this.openTelemetry = Objects.requireNonNull(openTelemetry, "openTelemetry"); + return this; + } + + /** Sets the OTLP metric and trace exporter endpoint used by the default SDK setup. */ + public Builder setEndpoint(String endpoint) { + this.endpoint = Objects.requireNonNull(endpoint, "endpoint"); + return this; + } + + /** Sets the service name used by the default SDK resource and Temporal metrics reporter. */ + public Builder setServiceName(String serviceName) { + this.serviceName = Objects.requireNonNull(serviceName, "serviceName"); + return this; + } + + /** Sets the interval used by the Tally metrics scope and periodic metric reader. */ + public Builder setMetricsReportInterval(Duration metricsReportInterval) { + this.metricsReportInterval = requirePositive(metricsReportInterval, "metricsReportInterval"); + return this; + } + + /** Sets how long the per-invocation OpenTelemetry flush hook waits for provider flushing. */ + public Builder setFlushTimeout(Duration flushTimeout) { + this.flushTimeout = requireNonNegative(flushTimeout, "flushTimeout"); + return this; + } + + /** Overrides the per-invocation flush hook. */ + public Builder setFlushHook(Runnable flushHook) { + this.flushHook = Objects.requireNonNull(flushHook, "flushHook"); + return this; + } + + public String getEndpoint() { + return endpoint == null ? resolveEndpoint(env) : endpoint; + } + + public String getServiceName() { + return serviceName == null ? resolveServiceName(env) : serviceName; + } + + Builder setTelemetryFactory(TelemetryFactory telemetryFactory) { + this.telemetryFactory = Objects.requireNonNull(telemetryFactory, "telemetryFactory"); + return this; + } + + OpenTelemetry createOpenTelemetry() { + return telemetryFactory.create( + getEndpoint(), getServiceName(), metricsReportInterval, flushTimeout); + } + + void apply(LambdaWorkerOptions options) { + OpenTelemetry resolvedOpenTelemetry = + openTelemetry == null ? createOpenTelemetry() : openTelemetry; + configureMetrics(options, resolvedOpenTelemetry, getServiceName(), metricsReportInterval); + configureTracing(options, resolvedOpenTelemetry); + if (flushHook == null) { + configureFlushHook(options, resolvedOpenTelemetry, flushTimeout); + } else { + options.addShutdownHook(flushHook); + } + } + } + + interface TelemetryFactory { + OpenTelemetry create( + String endpoint, String serviceName, Duration metricsReportInterval, Duration flushTimeout); + } + + private static final class DefaultTelemetryFactory implements TelemetryFactory { + @Override + public OpenTelemetry create( + String endpoint, + String serviceName, + Duration metricsReportInterval, + Duration flushTimeout) { + Resource resource = + Resource.getDefault() + .merge(Resource.create(Attributes.of(SERVICE_NAME_ATTRIBUTE, serviceName))); + MetricExporter metricExporter = + OtlpGrpcMetricExporter.builder().setEndpoint(endpoint).build(); + SpanExporter spanExporter = OtlpGrpcSpanExporter.builder().setEndpoint(endpoint).build(); + + SdkMeterProvider meterProvider = + SdkMeterProvider.builder() + .setResource(resource) + .registerMetricReader( + PeriodicMetricReader.builder(metricExporter) + .setInterval(metricsReportInterval) + .build()) + .build(); + SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .setResource(resource) + .setIdGenerator(AwsXrayIdGenerator.getInstance()) + .addSpanProcessor( + BatchSpanProcessor.builder(spanExporter).setExporterTimeout(flushTimeout).build()) + .build(); + + return OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .setTracerProvider(tracerProvider) + .build(); + } + } + + private static String nonEmptyEnv(Map env, String name) { + if (env == null) { + return null; + } + String value = env.get(name); + return value == null || value.trim().isEmpty() ? null : value; + } + + private static void appendClientInterceptor( + LambdaWorkerOptions options, WorkflowClientInterceptor interceptor) { + WorkflowClientOptions raw = options.getWorkflowClientOptionsBuilder().build(); + WorkflowClientInterceptor[] existing = raw.getInterceptors(); + int existingLength = existing == null ? 0 : existing.length; + WorkflowClientInterceptor[] interceptors = + existingLength == 0 + ? new WorkflowClientInterceptor[1] + : Arrays.copyOf(existing, existingLength + 1); + interceptors[existingLength] = interceptor; + options.getWorkflowClientOptionsBuilder().setInterceptors(interceptors); + } + + private static void appendWorkerInterceptor( + LambdaWorkerOptions options, WorkerInterceptor interceptor) { + WorkerFactoryOptions raw = options.getWorkerFactoryOptionsBuilder().build(); + WorkerInterceptor[] existing = raw.getWorkerInterceptors(); + int existingLength = existing == null ? 0 : existing.length; + WorkerInterceptor[] interceptors = + existingLength == 0 + ? new WorkerInterceptor[1] + : Arrays.copyOf(existing, existingLength + 1); + interceptors[existingLength] = interceptor; + options.getWorkerFactoryOptionsBuilder().setWorkerInterceptors(interceptors); + } + + private static Duration requirePositive(Duration value, String name) { + Objects.requireNonNull(value, name); + if (value.isZero() || value.isNegative()) { + throw new IllegalArgumentException(name + " must be positive"); + } + return value; + } + + private static Duration requireNonNegative(Duration value, String name) { + Objects.requireNonNull(value, name); + if (value.isNegative()) { + throw new IllegalArgumentException(name + " must not be negative"); + } + return value; + } +} diff --git a/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/WorkerRegistrar.java b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/WorkerRegistrar.java new file mode 100644 index 0000000000..53a47ce6a4 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/WorkerRegistrar.java @@ -0,0 +1,27 @@ +package io.temporal.aws.lambda; + +import io.temporal.common.converter.EncodedValues; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.Functions; + +interface WorkerRegistrar { + void registerWorkflowImplementationTypes(Class... workflowImplementationClasses); + + void registerWorkflowImplementationTypes( + WorkflowImplementationOptions options, Class... workflowImplementationClasses); + + void registerWorkflowImplementationFactory( + Class workflowInterface, Functions.Func factory); + + void registerWorkflowImplementationFactory( + Class workflowInterface, + Functions.Func1 factory, + WorkflowImplementationOptions options); + + void registerWorkflowImplementationFactory( + Class workflowInterface, Functions.Func factory, WorkflowImplementationOptions options); + + void registerActivitiesImplementations(Object... activityImplementations); + + void registerNexusServiceImplementation(Object... nexusServiceImplementations); +} diff --git a/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerLifecycleTest.java b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerLifecycleTest.java new file mode 100644 index 0000000000..e31cf20259 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerLifecycleTest.java @@ -0,0 +1,384 @@ +package io.temporal.aws.lambda; + +import static org.junit.Assert.*; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.common.WorkerDeploymentVersion; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.worker.WorkerOptions; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.Functions; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class LambdaWorkerLifecycleTest { + private static final WorkerDeploymentVersion VERSION = + new WorkerDeploymentVersion("deployment", "build"); + + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void invocationLifecycleRunsInOrder() { + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler( + options -> + options + .setTaskQueue("task-queue") + .registerWorkflowImplementationTypes(TestWorkflowImpl.class) + .registerActivitiesImplementations(new Object()) + .registerNexusServiceImplementation(new Object()) + .addShutdownHook(() -> runtime.events.add("hook-1")) + .addShutdownHook(() -> runtime.events.add("hook-2")), + runtime, + duration -> runtime.events.add("sleep:" + duration.toMillis())); + + handler.handleRequest(null, context(20_000)); + + assertEquals( + events( + "create", + "registerWorkflowTypes:1", + "registerActivities:1", + "registerNexus:1", + "start", + "sleep:13000", + "shutdown", + "await:5000", + "hook-1", + "hook-2", + "close:2000"), + runtime.events); + } + + @Test + public void configureRunsOnceAndRuntimeIsCreatedOncePerInvocation() { + AtomicInteger configureCount = new AtomicInteger(); + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler( + options -> { + configureCount.incrementAndGet(); + options.setTaskQueue("task-queue"); + }, + runtime); + + assertEquals(1, configureCount.get()); + + handler.handleRequest(null, context(20_000, "request-1", "function-arn-1")); + assertEquals("request-1@function-arn-1", runtime.clientOptions.getIdentity()); + assertEquals("request-1@function-arn-1", runtime.workerOptions.getIdentity()); + + handler.handleRequest(null, context(20_000, "request-2", "function-arn-2")); + assertEquals("request-2@function-arn-2", runtime.clientOptions.getIdentity()); + assertEquals("request-2@function-arn-2", runtime.workerOptions.getIdentity()); + + assertEquals(1, configureCount.get()); + assertEquals(2, runtime.createCount); + } + + @Test + public void missingTaskQueueFailsDuringHandlerConstruction() { + FakeRuntime runtime = new FakeRuntime(); + + assertThrows(IllegalStateException.class, () -> handler(options -> {}, runtime)); + + assertEquals(0, runtime.createCount); + } + + @Test + public void envconfigValuesAreVisibleToConfigureBeforeInvocation() throws IOException { + File config = temporaryFolder.newFile("temporal.toml"); + Files.write( + config.toPath(), + "[profile.default]\nnamespace = \"configured\"\n".getBytes(StandardCharsets.UTF_8)); + Map env = new HashMap<>(); + env.put(LambdaWorkerOptions.TEMPORAL_CONFIG_FILE, config.getAbsolutePath()); + AtomicReference namespaceInConfigure = new AtomicReference<>(); + FakeRuntime runtime = new FakeRuntime(); + + RequestHandler handler = + handler( + env, + options -> { + namespaceInConfigure.set( + options.getWorkflowClientOptionsBuilder().build().getNamespace()); + options.setTaskQueue("task-queue"); + }, + runtime, + duration -> {}); + + assertEquals("configured", namespaceInConfigure.get()); + + handler.handleRequest(null, context(20_000)); + + assertEquals("configured", runtime.clientOptions.getNamespace()); + } + + @Test + public void identityUsesAwsRequestIdAndFunctionArn() { + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler(options -> options.setTaskQueue("task-queue"), runtime); + + handler.handleRequest(null, context(20_000)); + + assertEquals("request-id@function-arn", runtime.clientOptions.getIdentity()); + assertEquals("request-id@function-arn", runtime.workerOptions.getIdentity()); + } + + @Test + public void userProvidedClientIdentityIsPreserved() { + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler( + options -> { + options.setTaskQueue("task-queue"); + options.getWorkflowClientOptionsBuilder().setIdentity("custom-client"); + }, + runtime); + + handler.handleRequest(null, context(20_000)); + + assertEquals("custom-client", runtime.clientOptions.getIdentity()); + assertEquals("custom-client", runtime.workerOptions.getIdentity()); + } + + @Test + public void userProvidedWorkerIdentityIsPreserved() { + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler( + options -> { + options.setTaskQueue("task-queue"); + options.getWorkerOptionsBuilder().setIdentity("custom-worker"); + }, + runtime); + + handler.handleRequest(null, context(20_000)); + + assertEquals("custom-worker", runtime.workerOptions.getIdentity()); + assertEquals("custom-worker", runtime.clientOptions.getIdentity()); + } + + @Test + public void insufficientRemainingTimeThrowsBeforeRuntimeCreation() { + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler(options -> options.setTaskQueue("task-queue"), runtime); + + assertThrows(IllegalStateException.class, () -> handler.handleRequest(null, context(8_000))); + + assertEquals(0, runtime.createCount); + } + + @Test + public void lowRemainingTimeStillStartsAndSleepsUntilShutdownBuffer() { + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler( + options -> options.setTaskQueue("task-queue"), + runtime, + duration -> runtime.events.add("sleep:" + duration.toMillis())); + + handler.handleRequest(null, context(10_500)); + + assertTrue(runtime.events.contains("start")); + assertTrue(runtime.events.contains("sleep:3500")); + } + + @Test + public void shutdownHookErrorsDoNotSkipLaterHooks() { + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler( + options -> + options + .setTaskQueue("task-queue") + .addShutdownHook( + () -> { + runtime.events.add("hook-1"); + throw new RuntimeException("hook failed"); + }) + .addShutdownHook(() -> runtime.events.add("hook-2")), + runtime); + + handler.handleRequest(null, context(20_000)); + + assertTrue(runtime.events.contains("hook-1")); + assertTrue(runtime.events.contains("hook-2")); + assertEquals("close:2000", runtime.events.get(runtime.events.size() - 1)); + } + + private RequestHandler handler( + java.util.function.Consumer configure, FakeRuntime runtime) { + return handler(configure, runtime, duration -> {}); + } + + private RequestHandler handler( + java.util.function.Consumer configure, + FakeRuntime runtime, + LambdaWorker.Sleeper sleeper) { + return handler(baseEnv(), configure, runtime, sleeper); + } + + private RequestHandler handler( + Map env, + java.util.function.Consumer configure, + FakeRuntime runtime, + LambdaWorker.Sleeper sleeper) { + try { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(env); + configure.accept(options); + return LambdaWorker.newHandler(VERSION, options, runtime, sleeper); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private Context context(int remainingTimeMillis) { + return new TestLambdaContext(remainingTimeMillis); + } + + private Context context(int remainingTimeMillis, String awsRequestId, String invokedFunctionArn) { + return new TestLambdaContext(remainingTimeMillis, awsRequestId, invokedFunctionArn); + } + + private Map baseEnv() { + Map env = new HashMap<>(); + env.put(LambdaWorkerOptions.TEMPORAL_CONFIG_FILE, "/nonexistent/temporal.toml"); + return env; + } + + private List events(String... events) { + List result = new ArrayList<>(); + for (String event : events) { + result.add(event); + } + return result; + } + + private static final class FakeRuntime implements LambdaWorkerRuntime { + private final List events = new ArrayList<>(); + private int createCount; + private WorkflowClientOptions clientOptions; + private WorkerOptions workerOptions; + + @Override + public Invocation create( + WorkflowServiceStubsOptions serviceStubsOptions, + WorkflowClientOptions clientOptions, + WorkerFactoryOptions workerFactoryOptions, + String taskQueue, + WorkerOptions workerOptions) { + createCount++; + events.add("create"); + this.clientOptions = clientOptions; + this.workerOptions = workerOptions; + return new FakeInvocation(events); + } + } + + private static final class FakeInvocation implements LambdaWorkerRuntime.Invocation { + private final List events; + private final WorkerRegistrar registrar; + + private FakeInvocation(List events) { + this.events = events; + this.registrar = new FakeWorkerRegistrar(events); + } + + @Override + public WorkerRegistrar getWorkerRegistrar() { + return registrar; + } + + @Override + public void start() { + events.add("start"); + } + + @Override + public void shutdown() { + events.add("shutdown"); + } + + @Override + public void awaitTermination(Duration timeout) { + events.add("await:" + timeout.toMillis()); + } + + @Override + public void closeStubs(Duration timeout) { + events.add("close:" + timeout.toMillis()); + } + } + + private static final class FakeWorkerRegistrar implements WorkerRegistrar { + private final List events; + + private FakeWorkerRegistrar(List events) { + this.events = events; + } + + @Override + public void registerWorkflowImplementationTypes(Class... workflowImplementationClasses) { + events.add("registerWorkflowTypes:" + workflowImplementationClasses.length); + } + + @Override + public void registerWorkflowImplementationTypes( + WorkflowImplementationOptions options, Class... workflowImplementationClasses) { + events.add("registerWorkflowTypesWithOptions:" + workflowImplementationClasses.length); + } + + @Override + public void registerWorkflowImplementationFactory( + Class workflowInterface, Functions.Func factory) { + events.add("registerWorkflowFactory"); + } + + @Override + public void registerWorkflowImplementationFactory( + Class workflowInterface, + Functions.Func1 factory, + WorkflowImplementationOptions options) { + events.add("registerWorkflowFactoryWithArgs"); + } + + @Override + public void registerWorkflowImplementationFactory( + Class workflowInterface, + Functions.Func factory, + WorkflowImplementationOptions options) { + events.add("registerWorkflowFactoryWithOptions"); + } + + @Override + public void registerActivitiesImplementations(Object... activityImplementations) { + events.add("registerActivities:" + activityImplementations.length); + } + + @Override + public void registerNexusServiceImplementation(Object... nexusServiceImplementations) { + events.add("registerNexus:" + nexusServiceImplementations.length); + } + } + + private static final class TestWorkflowImpl {} +} diff --git a/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerOptionsTest.java b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerOptionsTest.java new file mode 100644 index 0000000000..9538b57078 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerOptionsTest.java @@ -0,0 +1,243 @@ +package io.temporal.aws.lambda; + +import static org.junit.Assert.*; + +import io.temporal.client.WorkflowClientOptions; +import io.temporal.common.VersioningBehavior; +import io.temporal.common.WorkerDeploymentVersion; +import io.temporal.worker.WorkerDeploymentOptions; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.worker.WorkerOptions; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class LambdaWorkerOptionsTest { + private static final WorkerDeploymentVersion VERSION = + new WorkerDeploymentVersion("deployment", "build"); + + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void lambdaDefaultsAreAppliedToUnsetOptions() throws IOException { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + options.setTaskQueue("task-queue"); + + LambdaWorkerOptions.Materialized materialized = options.materialize(VERSION, "request@arn"); + + WorkerOptions workerOptions = materialized.workerOptions; + assertEquals(2, workerOptions.getMaxConcurrentActivityExecutionSize()); + assertEquals(10, workerOptions.getMaxConcurrentWorkflowTaskExecutionSize()); + assertEquals(2, workerOptions.getMaxConcurrentLocalActivityExecutionSize()); + assertEquals(5, workerOptions.getMaxConcurrentNexusExecutionSize()); + assertEquals(2, workerOptions.getMaxConcurrentWorkflowTaskPollers()); + assertEquals(1, workerOptions.getMaxConcurrentActivityTaskPollers()); + assertEquals(1, workerOptions.getMaxConcurrentNexusTaskPollers()); + assertTrue(workerOptions.isEagerExecutionDisabled()); + + WorkerFactoryOptions factoryOptions = materialized.workerFactoryOptions; + assertEquals(30, factoryOptions.getWorkflowCacheSize()); + assertEquals(30, factoryOptions.getMaxWorkflowThreadCount()); + + WorkerDeploymentOptions deploymentOptions = workerOptions.getDeploymentOptions(); + assertTrue(deploymentOptions.isUsingVersioning()); + assertEquals(VERSION, deploymentOptions.getVersion()); + assertEquals(VersioningBehavior.PINNED, deploymentOptions.getDefaultVersioningBehavior()); + assertEquals(Duration.ofSeconds(5), materialized.gracefulShutdownTimeout); + assertEquals(Duration.ofSeconds(7), materialized.shutdownDeadlineBuffer); + } + + @Test + public void userOverridesWinOverLambdaDefaults() throws IOException { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + options.setTaskQueue("task-queue"); + options.getWorkerOptionsBuilder().setMaxConcurrentActivityExecutionSize(11); + options.getWorkerOptionsBuilder().setMaxConcurrentWorkflowTaskExecutionSize(12); + options.getWorkerOptionsBuilder().setMaxConcurrentLocalActivityExecutionSize(13); + options.getWorkerOptionsBuilder().setMaxConcurrentNexusExecutionSize(14); + options.getWorkerOptionsBuilder().setMaxConcurrentWorkflowTaskPollers(3); + options.getWorkerOptionsBuilder().setMaxConcurrentActivityTaskPollers(4); + options.getWorkerOptionsBuilder().setMaxConcurrentNexusTaskPollers(6); + options + .getWorkerOptionsBuilder() + .setDeploymentOptions( + WorkerDeploymentOptions.newBuilder() + .setUseVersioning(true) + .setVersion(new WorkerDeploymentVersion("ignored", "ignored")) + .setDefaultVersioningBehavior(VersioningBehavior.AUTO_UPGRADE) + .build()); + options.getWorkerFactoryOptionsBuilder().setWorkflowCacheSize(17); + options.getWorkerFactoryOptionsBuilder().setMaxWorkflowThreadCount(18); + + LambdaWorkerOptions.Materialized materialized = options.materialize(VERSION, "request@arn"); + + WorkerOptions workerOptions = materialized.workerOptions; + assertEquals(11, workerOptions.getMaxConcurrentActivityExecutionSize()); + assertEquals(12, workerOptions.getMaxConcurrentWorkflowTaskExecutionSize()); + assertEquals(13, workerOptions.getMaxConcurrentLocalActivityExecutionSize()); + assertEquals(14, workerOptions.getMaxConcurrentNexusExecutionSize()); + assertEquals(3, workerOptions.getMaxConcurrentWorkflowTaskPollers()); + assertEquals(4, workerOptions.getMaxConcurrentActivityTaskPollers()); + assertEquals(6, workerOptions.getMaxConcurrentNexusTaskPollers()); + assertTrue(workerOptions.isEagerExecutionDisabled()); + assertEquals(17, materialized.workerFactoryOptions.getWorkflowCacheSize()); + assertEquals(18, materialized.workerFactoryOptions.getMaxWorkflowThreadCount()); + assertTrue(workerOptions.getDeploymentOptions().isUsingVersioning()); + assertEquals(VERSION, workerOptions.getDeploymentOptions().getVersion()); + assertEquals( + VersioningBehavior.AUTO_UPGRADE, + workerOptions.getDeploymentOptions().getDefaultVersioningBehavior()); + } + + @Test + public void temporalTaskQueueEnvPopulatesTaskQueue() throws IOException { + Map env = baseEnv(); + env.put(LambdaWorkerOptions.TEMPORAL_TASK_QUEUE, "env-task-queue"); + + LambdaWorkerOptions.Materialized materialized = + LambdaWorkerOptions.fromEnvironment(env).materialize(VERSION, "request@arn"); + + assertEquals("env-task-queue", materialized.taskQueue); + } + + @Test + public void missingTaskQueueFailsBeforeRuntimeCreation() throws IOException { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + + IllegalStateException e = + assertThrows( + IllegalStateException.class, () -> options.materialize(VERSION, "request@arn")); + + assertTrue(e.getMessage().contains("Task queue must be set")); + } + + @Test + public void gracefulShutdownTimeoutRecomputesDefaultShutdownDeadlineBuffer() throws IOException { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + options.setTaskQueue("task-queue"); + options.setGracefulShutdownTimeout(Duration.ofSeconds(3)); + + LambdaWorkerOptions.Materialized materialized = options.materialize(VERSION, "request@arn"); + + assertEquals(Duration.ofSeconds(3), materialized.gracefulShutdownTimeout); + assertEquals(Duration.ofSeconds(5), materialized.shutdownDeadlineBuffer); + } + + @Test + public void explicitShutdownDeadlineBufferIsNotRecomputed() throws IOException { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + options.setTaskQueue("task-queue"); + options.setShutdownDeadlineBuffer(Duration.ofSeconds(9)); + options.setGracefulShutdownTimeout(Duration.ofSeconds(3)); + + LambdaWorkerOptions.Materialized materialized = options.materialize(VERSION, "request@arn"); + + assertEquals(Duration.ofSeconds(3), materialized.gracefulShutdownTimeout); + assertEquals(Duration.ofSeconds(9), materialized.shutdownDeadlineBuffer); + } + + @Test + public void temporalConfigFileTakesPrecedenceOverLambdaTaskRoot() throws IOException { + File explicitConfig = temporaryFolder.newFile("explicit.toml"); + Files.write( + explicitConfig.toPath(), + "[profile.default]\nnamespace = \"explicit\"\n".getBytes(StandardCharsets.UTF_8)); + File taskRoot = temporaryFolder.newFolder("task-root"); + Files.write( + new File(taskRoot, "temporal.toml").toPath(), + "[profile.default]\nnamespace = \"lambda-root\"\n".getBytes(StandardCharsets.UTF_8)); + + Map env = new HashMap<>(); + env.put(LambdaWorkerOptions.TEMPORAL_CONFIG_FILE, explicitConfig.getAbsolutePath()); + env.put(LambdaWorkerOptions.LAMBDA_TASK_ROOT, taskRoot.getAbsolutePath()); + + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(env); + options.setTaskQueue("task-queue"); + WorkflowClientOptions clientOptions = options.materialize(VERSION, "request@arn").clientOptions; + + assertEquals("explicit", clientOptions.getNamespace()); + } + + @Test + public void lambdaTaskRootTemporalTomlWinsOverCwdTemporalToml() throws IOException { + File taskRoot = temporaryFolder.newFolder("task-root"); + writeConfig(taskRoot, "lambda-root"); + File cwd = temporaryFolder.newFolder("cwd"); + writeConfig(cwd, "cwd"); + + Map env = new HashMap<>(); + env.put(LambdaWorkerOptions.LAMBDA_TASK_ROOT, taskRoot.getAbsolutePath()); + + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(env, cwd); + options.setTaskQueue("task-queue"); + + assertEquals( + "lambda-root", options.materialize(VERSION, "request@arn").clientOptions.getNamespace()); + } + + @Test + public void cwdTemporalTomlIsUsedWhenLambdaTaskRootIsUnset() throws IOException { + File cwd = temporaryFolder.newFolder("cwd"); + writeConfig(cwd, "cwd"); + + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(new HashMap<>(), cwd); + options.setTaskQueue("task-queue"); + + assertEquals("cwd", options.materialize(VERSION, "request@arn").clientOptions.getNamespace()); + } + + @Test + public void cwdTemporalTomlIsUsedWhenLambdaTaskRootHasNoTemporalToml() throws IOException { + File taskRoot = temporaryFolder.newFolder("task-root"); + File cwd = temporaryFolder.newFolder("cwd"); + writeConfig(cwd, "cwd"); + Map env = new HashMap<>(); + env.put(LambdaWorkerOptions.LAMBDA_TASK_ROOT, taskRoot.getAbsolutePath()); + + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(env, cwd); + options.setTaskQueue("task-queue"); + + assertEquals("cwd", options.materialize(VERSION, "request@arn").clientOptions.getNamespace()); + } + + @Test + public void envconfigDefaultsAreUsedWhenNoConfigFileCandidateExists() throws IOException { + File cwd = temporaryFolder.newFolder("cwd"); + + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(new HashMap<>(), cwd); + options.setTaskQueue("task-queue"); + + assertEquals( + "default", options.materialize(VERSION, "request@arn").clientOptions.getNamespace()); + } + + @Test + public void deploymentVersionMustHaveDeploymentNameAndBuildId() { + assertThrows( + IllegalArgumentException.class, + () -> LambdaWorkerOptions.validateVersion(new WorkerDeploymentVersion("", "build"))); + assertThrows( + IllegalArgumentException.class, + () -> LambdaWorkerOptions.validateVersion(new WorkerDeploymentVersion("deployment", ""))); + } + + private Map baseEnv() { + Map env = new HashMap<>(); + env.put(LambdaWorkerOptions.TEMPORAL_CONFIG_FILE, "/nonexistent/temporal.toml"); + return env; + } + + private void writeConfig(File directory, String namespace) throws IOException { + Files.write( + new File(directory, "temporal.toml").toPath(), + ("[profile.default]\nnamespace = \"" + namespace + "\"\n") + .getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/OtelLambdaWorkerTest.java b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/OtelLambdaWorkerTest.java new file mode 100644 index 0000000000..64811c1439 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/OtelLambdaWorkerTest.java @@ -0,0 +1,399 @@ +package io.temporal.aws.lambda; + +import static org.junit.Assert.*; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.MeterBuilder; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.TracerProvider; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.temporal.common.WorkerDeploymentVersion; +import java.io.Closeable; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; + +public class OtelLambdaWorkerTest { + private static final WorkerDeploymentVersion VERSION = + new WorkerDeploymentVersion("deployment", "build"); + + @Test + public void defaultsResolveEndpointAndServiceName() { + assertEquals( + "http://localhost:4317", OtelLambdaWorker.newBuilder(new HashMap<>()).getEndpoint()); + assertEquals( + "temporal-lambda-worker", OtelLambdaWorker.newBuilder(new HashMap<>()).getServiceName()); + + Map env = new HashMap<>(); + env.put(OtelLambdaWorker.OTEL_EXPORTER_OTLP_ENDPOINT, "http://collector:4317"); + env.put(OtelLambdaWorker.AWS_LAMBDA_FUNCTION_NAME, "function-name"); + assertEquals("http://collector:4317", OtelLambdaWorker.newBuilder(env).getEndpoint()); + assertEquals("function-name", OtelLambdaWorker.newBuilder(env).getServiceName()); + + env.put(OtelLambdaWorker.OTEL_SERVICE_NAME, "explicit-service"); + assertEquals("explicit-service", OtelLambdaWorker.newBuilder(env).getServiceName()); + } + + @Test + public void defaultFactoryCreatesExporterBackedSdkProviders() { + OpenTelemetry openTelemetry = + OtelLambdaWorker.newBuilder(new HashMap<>()).createOpenTelemetry(); + + assertTrue(openTelemetry instanceof OpenTelemetrySdk); + + OpenTelemetrySdk sdk = (OpenTelemetrySdk) openTelemetry; + assertNotNull(sdk.getSdkTracerProvider()); + assertNotNull(sdk.getSdkMeterProvider()); + sdk.shutdown().join(1, TimeUnit.SECONDS); + } + + @Test + public void defaultFactoryCreatesXRayTraceIds() { + OpenTelemetry openTelemetry = + OtelLambdaWorker.newBuilder(new HashMap<>()) + .setFlushTimeout(Duration.ofMillis(10)) + .createOpenTelemetry(); + long beforeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + + assertTrue(openTelemetry instanceof OpenTelemetrySdk); + + OpenTelemetrySdk sdk = (OpenTelemetrySdk) openTelemetry; + Span span = sdk.getSdkTracerProvider().get("test").spanBuilder("test").startSpan(); + try { + String traceId = span.getSpanContext().getTraceId(); + long afterSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + long xrayTimestampSeconds = Long.parseLong(traceId.substring(0, 8), 16); + + assertTrue(xrayTimestampSeconds >= beforeSeconds); + assertTrue(xrayTimestampSeconds <= afterSeconds); + } finally { + sdk.shutdown().join(1, TimeUnit.SECONDS); + span.end(); + } + } + + @Test + public void exporterFactoryReceivesResolvedEndpointAndServiceName() throws Exception { + Map env = new HashMap<>(); + env.put(OtelLambdaWorker.OTEL_EXPORTER_OTLP_ENDPOINT, "http://collector:4317"); + env.put(OtelLambdaWorker.AWS_LAMBDA_FUNCTION_NAME, "function-name"); + RecordingTelemetryFactory factory = new RecordingTelemetryFactory(); + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + + OtelLambdaWorker.newBuilder(env).setTelemetryFactory(factory).apply(options); + + assertEquals(1, factory.creates.get()); + assertEquals("http://collector:4317", factory.endpoint); + assertEquals("function-name", factory.serviceName); + } + + @Test + public void customEndpointAndServiceNameAreUsedByExporterFactory() throws Exception { + RecordingTelemetryFactory factory = new RecordingTelemetryFactory(); + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + + OtelLambdaWorker.newBuilder(new HashMap<>()) + .setTelemetryFactory(factory) + .setEndpoint("http://custom-collector:4317") + .setServiceName("custom-service") + .setMetricsReportInterval(Duration.ofSeconds(3)) + .setFlushTimeout(Duration.ofSeconds(4)) + .apply(options); + + assertEquals(1, factory.creates.get()); + assertEquals("http://custom-collector:4317", factory.endpoint); + assertEquals("custom-service", factory.serviceName); + assertEquals(Duration.ofSeconds(3), factory.metricsReportInterval); + assertEquals(Duration.ofSeconds(4), factory.flushTimeout); + } + + @Test + public void metricsScopeAndTracingInterceptorsAreInstalled() throws Exception { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + + OtelLambdaWorker.configure( + options, builder -> builder.setOpenTelemetry(OpenTelemetry.noop()).setFlushHook(() -> {})); + + assertNotNull(options.getWorkflowServiceStubsOptionsBuilder().build().getMetricsScope()); + assertEquals(1, options.getWorkflowClientOptionsBuilder().build().getInterceptors().length); + assertEquals( + 1, options.getWorkerFactoryOptionsBuilder().build().getWorkerInterceptors().length); + } + + @Test + public void metricsOnlyInstallsScopeWithoutTracingInterceptors() throws Exception { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + + OtelLambdaWorker.configureMetrics(options, OpenTelemetry.noop()); + + assertNotNull(options.getWorkflowServiceStubsOptionsBuilder().build().getMetricsScope()); + assertEquals(0, clientInterceptorCount(options)); + assertEquals(0, workerInterceptorCount(options)); + } + + @Test + public void tracingOnlyInstallsInterceptorsWithoutMetricsScope() throws Exception { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + + OtelLambdaWorker.configureTracing(options, OpenTelemetry.noop()); + + assertNull(options.getWorkflowServiceStubsOptionsBuilder().build().getMetricsScope()); + assertEquals(1, clientInterceptorCount(options)); + assertEquals(1, workerInterceptorCount(options)); + } + + @Test + public void flushHookOnlyRunsOncePerInvocationAndDoesNotCloseProviders() { + CountingOpenTelemetry openTelemetry = new CountingOpenTelemetry(); + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler( + options -> { + options.setTaskQueue("task-queue"); + OtelLambdaWorker.configureFlushHook(options, openTelemetry, Duration.ofSeconds(1)); + assertNull(options.getWorkflowServiceStubsOptionsBuilder().build().getMetricsScope()); + assertEquals(0, clientInterceptorCount(options)); + assertEquals(0, workerInterceptorCount(options)); + }, + runtime, + duration -> {}); + + handler.handleRequest(null, context()); + handler.handleRequest(null, context()); + + assertEquals(2, openTelemetry.tracerProvider.flushes.get()); + assertEquals(2, openTelemetry.meterProvider.flushes.get()); + assertEquals(0, openTelemetry.tracerProvider.closes.get()); + assertEquals(0, openTelemetry.meterProvider.closes.get()); + } + + @Test + public void customOpenTelemetryBypassesExporterCreation() throws Exception { + RecordingTelemetryFactory factory = new RecordingTelemetryFactory(); + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + + OtelLambdaWorker.newBuilder(new HashMap<>()) + .setTelemetryFactory(factory) + .setOpenTelemetry(OpenTelemetry.noop()) + .apply(options); + + assertEquals(0, factory.creates.get()); + assertNotNull(options.getWorkflowServiceStubsOptionsBuilder().build().getMetricsScope()); + } + + @Test + public void flushHookRunsOncePerInvocationAndDoesNotCloseProviders() { + CountingOpenTelemetry openTelemetry = new CountingOpenTelemetry(); + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler( + options -> { + options.setTaskQueue("task-queue"); + OtelLambdaWorker.configure( + options, builder -> builder.setOpenTelemetry(openTelemetry)); + }, + runtime, + duration -> {}); + + handler.handleRequest(null, context()); + handler.handleRequest(null, context()); + + assertEquals(2, openTelemetry.tracerProvider.flushes.get()); + assertEquals(2, openTelemetry.meterProvider.flushes.get()); + assertEquals(0, openTelemetry.tracerProvider.closes.get()); + assertEquals(0, openTelemetry.meterProvider.closes.get()); + } + + private Context context() { + return new TestLambdaContext(20_000); + } + + private Map baseEnv() { + Map env = new HashMap<>(); + env.put(LambdaWorkerOptions.TEMPORAL_CONFIG_FILE, "/nonexistent/temporal.toml"); + return env; + } + + private RequestHandler handler( + java.util.function.Consumer configure, + FakeRuntime runtime, + LambdaWorker.Sleeper sleeper) { + try { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + configure.accept(options); + return LambdaWorker.newHandler(VERSION, options, runtime, sleeper); + } catch (java.io.IOException e) { + throw new RuntimeException(e); + } + } + + private int clientInterceptorCount(LambdaWorkerOptions options) { + io.temporal.common.interceptors.WorkflowClientInterceptor[] interceptors = + options.getWorkflowClientOptionsBuilder().build().getInterceptors(); + return interceptors == null ? 0 : interceptors.length; + } + + private int workerInterceptorCount(LambdaWorkerOptions options) { + io.temporal.common.interceptors.WorkerInterceptor[] interceptors = + options.getWorkerFactoryOptionsBuilder().build().getWorkerInterceptors(); + return interceptors == null ? 0 : interceptors.length; + } + + private static final class RecordingTelemetryFactory + implements OtelLambdaWorker.TelemetryFactory { + private final AtomicInteger creates = new AtomicInteger(); + private String endpoint; + private String serviceName; + private Duration metricsReportInterval; + private Duration flushTimeout; + + @Override + public OpenTelemetry create( + String endpoint, + String serviceName, + Duration metricsReportInterval, + Duration flushTimeout) { + creates.incrementAndGet(); + this.endpoint = endpoint; + this.serviceName = serviceName; + this.metricsReportInterval = metricsReportInterval; + this.flushTimeout = flushTimeout; + return OpenTelemetry.noop(); + } + } + + private static final class CountingOpenTelemetry implements OpenTelemetry { + private final CountingTracerProvider tracerProvider = new CountingTracerProvider(); + private final CountingMeterProvider meterProvider = new CountingMeterProvider(); + + @Override + public TracerProvider getTracerProvider() { + return tracerProvider; + } + + @Override + public MeterProvider getMeterProvider() { + return meterProvider; + } + + @Override + public ContextPropagators getPropagators() { + return ContextPropagators.noop(); + } + } + + public static final class CountingTracerProvider implements TracerProvider, Closeable { + private final AtomicInteger flushes = new AtomicInteger(); + private final AtomicInteger closes = new AtomicInteger(); + + @Override + public Tracer get(String instrumentationName) { + return TracerProvider.noop().get(instrumentationName); + } + + @Override + public Tracer get(String instrumentationName, String instrumentationVersion) { + return TracerProvider.noop().get(instrumentationName, instrumentationVersion); + } + + public CompletableResultCode forceFlush() { + flushes.incrementAndGet(); + return CompletableResultCode.ofSuccess(); + } + + @Override + public void close() { + closes.incrementAndGet(); + } + } + + public static final class CountingMeterProvider implements MeterProvider, Closeable { + private final AtomicInteger flushes = new AtomicInteger(); + private final AtomicInteger closes = new AtomicInteger(); + + @Override + public MeterBuilder meterBuilder(String instrumentationName) { + return MeterProvider.noop().meterBuilder(instrumentationName); + } + + public CompletableResultCode forceFlush() { + flushes.incrementAndGet(); + return CompletableResultCode.ofSuccess(); + } + + @Override + public void close() { + closes.incrementAndGet(); + } + } + + private static final class FakeRuntime implements LambdaWorkerRuntime { + @Override + public Invocation create( + io.temporal.serviceclient.WorkflowServiceStubsOptions serviceStubsOptions, + io.temporal.client.WorkflowClientOptions clientOptions, + io.temporal.worker.WorkerFactoryOptions workerFactoryOptions, + String taskQueue, + io.temporal.worker.WorkerOptions workerOptions) { + return new Invocation() { + @Override + public WorkerRegistrar getWorkerRegistrar() { + return new NoopRegistrar(); + } + + @Override + public void start() {} + + @Override + public void shutdown() {} + + @Override + public void awaitTermination(java.time.Duration timeout) {} + + @Override + public void closeStubs(java.time.Duration timeout) {} + }; + } + } + + private static final class NoopRegistrar implements WorkerRegistrar { + @Override + public void registerWorkflowImplementationTypes(Class... workflowImplementationClasses) {} + + @Override + public void registerWorkflowImplementationTypes( + io.temporal.worker.WorkflowImplementationOptions options, + Class... workflowImplementationClasses) {} + + @Override + public void registerWorkflowImplementationFactory( + Class workflowInterface, io.temporal.workflow.Functions.Func factory) {} + + @Override + public void registerWorkflowImplementationFactory( + Class workflowInterface, + io.temporal.workflow.Functions.Func1 factory, + io.temporal.worker.WorkflowImplementationOptions options) {} + + @Override + public void registerWorkflowImplementationFactory( + Class workflowInterface, + io.temporal.workflow.Functions.Func factory, + io.temporal.worker.WorkflowImplementationOptions options) {} + + @Override + public void registerActivitiesImplementations(Object... activityImplementations) {} + + @Override + public void registerNexusServiceImplementation(Object... nexusServiceImplementations) {} + } +} diff --git a/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/TestLambdaContext.java b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/TestLambdaContext.java new file mode 100644 index 0000000000..2d946466c8 --- /dev/null +++ b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/TestLambdaContext.java @@ -0,0 +1,86 @@ +package io.temporal.aws.lambda; + +import com.amazonaws.services.lambda.runtime.ClientContext; +import com.amazonaws.services.lambda.runtime.CognitoIdentity; +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.LambdaLogger; + +final class TestLambdaContext implements Context { + private static final LambdaLogger NOOP_LOGGER = + new LambdaLogger() { + @Override + public void log(String message) {} + + @Override + public void log(byte[] message) {} + }; + + private final int remainingTimeMillis; + private final String awsRequestId; + private final String invokedFunctionArn; + + TestLambdaContext(int remainingTimeMillis) { + this(remainingTimeMillis, "request-id", "function-arn"); + } + + TestLambdaContext(int remainingTimeMillis, String awsRequestId, String invokedFunctionArn) { + this.remainingTimeMillis = remainingTimeMillis; + this.awsRequestId = awsRequestId; + this.invokedFunctionArn = invokedFunctionArn; + } + + @Override + public String getAwsRequestId() { + return awsRequestId; + } + + @Override + public String getLogGroupName() { + return "log-group"; + } + + @Override + public String getLogStreamName() { + return "log-stream"; + } + + @Override + public String getFunctionName() { + return "function"; + } + + @Override + public String getFunctionVersion() { + return "1"; + } + + @Override + public String getInvokedFunctionArn() { + return invokedFunctionArn; + } + + @Override + public CognitoIdentity getIdentity() { + return null; + } + + @Override + public ClientContext getClientContext() { + return null; + } + + @Override + public int getRemainingTimeInMillis() { + return remainingTimeMillis; + } + + @Override + public int getMemoryLimitInMB() { + return 128; + } + + @Override + public LambdaLogger getLogger() { + return NOOP_LOGGER; + } +} From 39c2b367c1b8a45d7df47fa5f366d0bbdd40a59b Mon Sep 17 00:00:00 2001 From: Edward Amsden Date: Thu, 4 Jun 2026 15:08:12 -0500 Subject: [PATCH 3/3] Support dynamic worker and activity registrations in Lambda --- contrib/temporal-aws-lambda/README.md | 2 + .../aws/lambda/LambdaWorkerOptions.java | 45 ++++++++++++++++ .../aws/lambda/LambdaWorkerLifecycleTest.java | 54 +++++++++++++++++++ .../aws/lambda/LambdaWorkerOptionsTest.java | 39 ++++++++++++++ 4 files changed, 140 insertions(+) diff --git a/contrib/temporal-aws-lambda/README.md b/contrib/temporal-aws-lambda/README.md index 905dd63632..2e93da5602 100644 --- a/contrib/temporal-aws-lambda/README.md +++ b/contrib/temporal-aws-lambda/README.md @@ -29,6 +29,8 @@ Connection options are loaded with `temporal-envconfig` when the handler is cons If you need to assemble options outside the `run` callback, call `LambdaWorkerOptions.fromEnvironment()`, mutate the returned options, and pass them to `LambdaWorker.newHandler(...)`. +Dynamic workflow and activity implementations can be registered with `registerDynamicWorkflowImplementationType(...)` and `registerDynamicActivityImplementation(...)`. Java SDK worker rules still apply: only one dynamic workflow implementation type and one dynamic activity implementation can be registered per worker. + The handler creates one worker per invocation, starts the worker, shuts it down before the Lambda deadline, runs shutdown hooks in order, and closes service stubs. Worker deployment versioning is always enabled for the supplied `WorkerDeploymentVersion`. If neither client nor worker identity is set by the user, each invocation uses `@` as the Temporal identity. `shutdownDeadlineBuffer` is the full shutdown window reserved at the end of the Lambda invocation. The default is 7 seconds: 5 seconds for `gracefulShutdownTimeout` and 2 seconds for hooks and service stubs. The worker runs until `remainingTime - shutdownDeadlineBuffer`, then stops and awaits termination for `gracefulShutdownTimeout`. If you change `gracefulShutdownTimeout` without explicitly setting `shutdownDeadlineBuffer`, the buffer is recomputed as `gracefulShutdownTimeout + 2s`. diff --git a/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerOptions.java b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerOptions.java index 8e1d019e77..7848d04f0b 100644 --- a/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerOptions.java +++ b/contrib/temporal-aws-lambda/src/main/java/io/temporal/aws/lambda/LambdaWorkerOptions.java @@ -1,5 +1,6 @@ package io.temporal.aws.lambda; +import io.temporal.activity.DynamicActivity; import io.temporal.client.WorkflowClientOptions; import io.temporal.common.VersioningBehavior; import io.temporal.common.WorkerDeploymentVersion; @@ -11,6 +12,7 @@ import io.temporal.worker.WorkerFactoryOptions; import io.temporal.worker.WorkerOptions; import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.DynamicWorkflow; import io.temporal.workflow.Functions; import java.io.File; import java.io.IOException; @@ -195,6 +197,36 @@ public LambdaWorkerOptions registerWorkflowImplementationTypes( return this; } + /** + * Registers a dynamic workflow implementation type. + * + *

Only one dynamic workflow implementation type can be registered per worker. + */ + public LambdaWorkerOptions registerDynamicWorkflowImplementationType( + Class workflowImplementationClass) { + final Class implementationClass = + Objects.requireNonNull(workflowImplementationClass, "workflowImplementationClass"); + registrations.add( + registrar -> registrar.registerWorkflowImplementationTypes(implementationClass)); + return this; + } + + /** + * Registers a dynamic workflow implementation type with custom workflow implementation options. + * + *

Only one dynamic workflow implementation type can be registered per worker. + */ + public LambdaWorkerOptions registerDynamicWorkflowImplementationType( + WorkflowImplementationOptions options, + Class workflowImplementationClass) { + Objects.requireNonNull(options, "options"); + final Class implementationClass = + Objects.requireNonNull(workflowImplementationClass, "workflowImplementationClass"); + registrations.add( + registrar -> registrar.registerWorkflowImplementationTypes(options, implementationClass)); + return this; + } + public LambdaWorkerOptions registerWorkflowImplementationFactory( Class workflowInterface, Functions.Func factory) { Objects.requireNonNull(workflowInterface, "workflowInterface"); @@ -237,6 +269,19 @@ public LambdaWorkerOptions registerActivitiesImplementations(Object... activityI return this; } + /** + * Registers a dynamic activity implementation. + * + *

Only one dynamic activity implementation can be registered per worker. + */ + public LambdaWorkerOptions registerDynamicActivityImplementation( + DynamicActivity activityImplementation) { + final DynamicActivity implementation = + Objects.requireNonNull(activityImplementation, "activityImplementation"); + registrations.add(registrar -> registrar.registerActivitiesImplementations(implementation)); + return this; + } + public LambdaWorkerOptions registerNexusServiceImplementation( Object... nexusServiceImplementations) { final Object[] implementations = diff --git a/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerLifecycleTest.java b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerLifecycleTest.java index e31cf20259..bd11bbad98 100644 --- a/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerLifecycleTest.java +++ b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerLifecycleTest.java @@ -4,12 +4,15 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; +import io.temporal.activity.DynamicActivity; import io.temporal.client.WorkflowClientOptions; import io.temporal.common.WorkerDeploymentVersion; +import io.temporal.common.converter.EncodedValues; import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.worker.WorkerFactoryOptions; import io.temporal.worker.WorkerOptions; import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.DynamicWorkflow; import io.temporal.workflow.Functions; import java.io.File; import java.io.IOException; @@ -66,6 +69,36 @@ public void invocationLifecycleRunsInOrder() { runtime.events); } + @Test + public void dynamicRegistrationsReplayBeforeWorkerStart() { + FakeRuntime runtime = new FakeRuntime(); + RequestHandler handler = + handler( + options -> + options + .setTaskQueue("task-queue") + .registerDynamicWorkflowImplementationType(TestDynamicWorkflow.class) + .registerDynamicWorkflowImplementationType( + WorkflowImplementationOptions.getDefaultInstance(), + TestDynamicWorkflowWithOptions.class) + .registerDynamicActivityImplementation(new TestDynamicActivity()), + runtime); + + handler.handleRequest(null, context(20_000)); + + assertEquals( + events( + "create", + "registerWorkflowTypes:1", + "registerWorkflowTypesWithOptions:1", + "registerActivities:1", + "start", + "shutdown", + "await:5000", + "close:2000"), + runtime.events); + } + @Test public void configureRunsOnceAndRuntimeIsCreatedOncePerInvocation() { AtomicInteger configureCount = new AtomicInteger(); @@ -381,4 +414,25 @@ public void registerNexusServiceImplementation(Object... nexusServiceImplementat } private static final class TestWorkflowImpl {} + + private static final class TestDynamicWorkflow implements DynamicWorkflow { + @Override + public Object execute(EncodedValues args) { + return null; + } + } + + private static final class TestDynamicWorkflowWithOptions implements DynamicWorkflow { + @Override + public Object execute(EncodedValues args) { + return null; + } + } + + private static final class TestDynamicActivity implements DynamicActivity { + @Override + public Object execute(EncodedValues args) { + return null; + } + } } diff --git a/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerOptionsTest.java b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerOptionsTest.java index 9538b57078..45a2b1a29f 100644 --- a/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerOptionsTest.java +++ b/contrib/temporal-aws-lambda/src/test/java/io/temporal/aws/lambda/LambdaWorkerOptionsTest.java @@ -2,12 +2,16 @@ import static org.junit.Assert.*; +import io.temporal.activity.DynamicActivity; import io.temporal.client.WorkflowClientOptions; import io.temporal.common.VersioningBehavior; import io.temporal.common.WorkerDeploymentVersion; +import io.temporal.common.converter.EncodedValues; import io.temporal.worker.WorkerDeploymentOptions; import io.temporal.worker.WorkerFactoryOptions; import io.temporal.worker.WorkerOptions; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.DynamicWorkflow; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -228,6 +232,27 @@ public void deploymentVersionMustHaveDeploymentNameAndBuildId() { () -> LambdaWorkerOptions.validateVersion(new WorkerDeploymentVersion("deployment", ""))); } + @Test + public void dynamicRegistrationMethodsRejectNullInputs() throws IOException { + LambdaWorkerOptions options = LambdaWorkerOptions.fromEnvironment(baseEnv()); + + assertThrows( + NullPointerException.class, + () -> + options.registerDynamicWorkflowImplementationType( + (Class) null)); + assertThrows( + NullPointerException.class, + () -> options.registerDynamicWorkflowImplementationType(null, TestDynamicWorkflow.class)); + assertThrows( + NullPointerException.class, + () -> + options.registerDynamicWorkflowImplementationType( + WorkflowImplementationOptions.getDefaultInstance(), null)); + assertThrows( + NullPointerException.class, () -> options.registerDynamicActivityImplementation(null)); + } + private Map baseEnv() { Map env = new HashMap<>(); env.put(LambdaWorkerOptions.TEMPORAL_CONFIG_FILE, "/nonexistent/temporal.toml"); @@ -240,4 +265,18 @@ private void writeConfig(File directory, String namespace) throws IOException { ("[profile.default]\nnamespace = \"" + namespace + "\"\n") .getBytes(StandardCharsets.UTF_8)); } + + private static final class TestDynamicWorkflow implements DynamicWorkflow { + @Override + public Object execute(EncodedValues args) { + return null; + } + } + + private static final class TestDynamicActivity implements DynamicActivity { + @Override + public Object execute(EncodedValues args) { + return null; + } + } }