Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions contrib/temporal-aws-lambda/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Temporal AWS Lambda worker module

This module provides a direct AWS Lambda Java handler for running a Temporal worker for one Lambda invocation.

## Usage

Add `temporal-aws-lambda` next to your Temporal SDK dependency, then expose the returned handler from your Lambda class:

```java
import com.amazonaws.services.lambda.runtime.RequestHandler;
import io.temporal.aws.lambda.LambdaWorker;
import io.temporal.common.WorkerDeploymentVersion;

public final class Handler {
public static final RequestHandler<Object, Void> HANDLER =
LambdaWorker.run(
new WorkerDeploymentVersion("orders-worker", "2026-06-02"),
options ->
options
.setTaskQueue("orders")
.registerWorkflowImplementationTypes(OrderWorkflowImpl.class)
.registerActivitiesImplementations(new OrderActivitiesImpl()));
}
```

`TEMPORAL_TASK_QUEUE` can provide the task queue. If it is not set, call `setTaskQueue`.

Connection options are loaded with `temporal-envconfig` when the handler is constructed during Lambda cold start. The Lambda worker checks `TEMPORAL_CONFIG_FILE` first, then readable `$LAMBDA_TASK_ROOT/temporal.toml`, then readable `./temporal.toml`, then falls back to the envconfig defaults and Temporal environment variables. The `configure` callback also runs during handler construction, so non-invocation configuration is prepared once and reused.

If you need to assemble options outside the `run` callback, call `LambdaWorkerOptions.fromEnvironment()`, mutate the returned options, and pass them to `LambdaWorker.newHandler(...)`.

Dynamic workflow and activity implementations can be registered with `registerDynamicWorkflowImplementationType(...)` and `registerDynamicActivityImplementation(...)`. Java SDK worker rules still apply: only one dynamic workflow implementation type and one dynamic activity implementation can be registered per worker.

The handler creates one worker per invocation, starts the worker, shuts it down before the Lambda deadline, runs shutdown hooks in order, and closes service stubs. Worker deployment versioning is always enabled for the supplied `WorkerDeploymentVersion`. If neither client nor worker identity is set by the user, each invocation uses `<awsRequestId>@<invokedFunctionArn>` as the Temporal identity.

`shutdownDeadlineBuffer` is the full shutdown window reserved at the end of the Lambda invocation. The default is 7 seconds: 5 seconds for `gracefulShutdownTimeout` and 2 seconds for hooks and service stubs. The worker runs until `remainingTime - shutdownDeadlineBuffer`, then stops and awaits termination for `gracefulShutdownTimeout`. If you change `gracefulShutdownTimeout` without explicitly setting `shutdownDeadlineBuffer`, the buffer is recomputed as `gracefulShutdownTimeout + 2s`.

## OpenTelemetry

`OtelLambdaWorker.configure(options)` creates an OpenTelemetry SDK with OTLP metric and trace exporters by default, uses AWS X-Ray-compatible trace ID generation, installs an OpenTelemetry-backed Tally metrics scope, configures tracing through the SDK OpenTracing interceptor path, and registers a per-invocation flush hook.

```java
public static final RequestHandler<Object, Void> HANDLER =
LambdaWorker.run(
new WorkerDeploymentVersion("orders-worker", "2026-06-02"),
options -> {
OtelLambdaWorker.configure(options);
options
.setTaskQueue("orders")
.registerWorkflowImplementationTypes(OrderWorkflowImpl.class);
});
```

The helper defaults the OTLP endpoint from `OTEL_EXPORTER_OTLP_ENDPOINT`, then `http://localhost:4317`. It defaults the service name from `OTEL_SERVICE_NAME`, then `AWS_LAMBDA_FUNCTION_NAME`, then `temporal-lambda-worker`, and sets it on the OpenTelemetry resource. To use an application-owned provider, call `builder.setOpenTelemetry(...)`; in that path, no exporters are created and the helper only installs the metrics scope, interceptors, and per-invocation flush hook. Providers and scopes are not closed after each invocation.

Use `OtelLambdaWorker.configureMetrics(...)`, `OtelLambdaWorker.configureTracing(...)`, and `OtelLambdaWorker.configureFlushHook(...)` when you want to compose metrics, tracing, or provider flushing separately around an application-owned OpenTelemetry instance.

For Java logging, this module depends on `slf4j-api` only. It does not bundle a runtime logging binding, so Lambda log formatting remains owned by the application.
31 changes: 31 additions & 0 deletions contrib/temporal-aws-lambda/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
description = '''Temporal Java SDK AWS Lambda Worker Support Module'''

ext {
awsLambdaJavaCoreVersion = '1.4.0'
otelVersion = '1.25.0'
otShimVersion = "${otelVersion}-alpha"
}

dependencies {
api platform("io.opentelemetry:opentelemetry-bom:$otelVersion")

// This module shouldn't carry temporal-sdk with it, especially for situations when users may
// be using a shaded artifact.
compileOnly project(':temporal-sdk')

api "com.amazonaws:aws-lambda-java-core:$awsLambdaJavaCoreVersion"

implementation project(':temporal-envconfig')
implementation project(':temporal-opentracing')
implementation "io.opentelemetry:opentelemetry-api"
implementation "io.opentelemetry.contrib:opentelemetry-aws-xray:$otelVersion"
implementation "io.opentelemetry:opentelemetry-exporter-otlp"
implementation "io.opentelemetry:opentelemetry-opentracing-shim:$otShimVersion"
implementation "io.opentelemetry:opentelemetry-sdk"
implementation "org.slf4j:slf4j-api:$slf4jVersion"

testImplementation project(':temporal-sdk')
testImplementation "junit:junit:${junitVersion}"

testRuntimeOnly group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package io.temporal.aws.lambda;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.common.converter.EncodedValues;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkerFactoryOptions;
import io.temporal.worker.WorkerOptions;
import io.temporal.worker.WorkflowImplementationOptions;
import io.temporal.workflow.Functions;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

final class DefaultLambdaWorkerRuntime implements LambdaWorkerRuntime {
@Override
public Invocation create(
WorkflowServiceStubsOptions serviceStubsOptions,
WorkflowClientOptions clientOptions,
WorkerFactoryOptions workerFactoryOptions,
String taskQueue,
WorkerOptions workerOptions) {
WorkflowServiceStubs stubs = WorkflowServiceStubs.newServiceStubs(serviceStubsOptions);
try {
WorkflowClient client = WorkflowClient.newInstance(stubs, clientOptions);
WorkerFactory factory = WorkerFactory.newInstance(client, workerFactoryOptions);
Worker worker = factory.newWorker(taskQueue, workerOptions);
return new DefaultInvocation(stubs, factory, worker);
} catch (RuntimeException e) {
stubs.shutdownNow();
throw e;
}
}

private static final class DefaultInvocation implements Invocation {
private final WorkflowServiceStubs stubs;
private final WorkerFactory factory;
private final WorkerRegistrar registrar;

private DefaultInvocation(WorkflowServiceStubs stubs, WorkerFactory factory, Worker worker) {
this.stubs = stubs;
this.factory = factory;
this.registrar = new DefaultWorkerRegistrar(worker);
}

@Override
public WorkerRegistrar getWorkerRegistrar() {
return registrar;
}

@Override
public void start() {
factory.start();
}

@Override
public void shutdown() {
factory.shutdown();
}

@Override
public void awaitTermination(Duration timeout) {
factory.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public void closeStubs(Duration timeout) {
stubs.shutdown();
if (!stubs.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
stubs.shutdownNow();
}
}
}

private static final class DefaultWorkerRegistrar implements WorkerRegistrar {
private final Worker worker;

private DefaultWorkerRegistrar(Worker worker) {
this.worker = worker;
}

@Override
public void registerWorkflowImplementationTypes(Class<?>... workflowImplementationClasses) {
worker.registerWorkflowImplementationTypes(workflowImplementationClasses);
}

@Override
public void registerWorkflowImplementationTypes(
WorkflowImplementationOptions options, Class<?>... workflowImplementationClasses) {
worker.registerWorkflowImplementationTypes(options, workflowImplementationClasses);
}

@Override
public <R> void registerWorkflowImplementationFactory(
Class<R> workflowInterface, Functions.Func<R> factory) {
worker.registerWorkflowImplementationFactory(workflowInterface, factory);
}

@Override
public <R> void registerWorkflowImplementationFactory(
Class<R> workflowInterface,
Functions.Func1<EncodedValues, R> factory,
WorkflowImplementationOptions options) {
worker.registerWorkflowImplementationFactory(workflowInterface, factory, options);
}

@Override
public <R> void registerWorkflowImplementationFactory(
Class<R> workflowInterface,
Functions.Func<R> factory,
WorkflowImplementationOptions options) {
worker.registerWorkflowImplementationFactory(workflowInterface, factory, options);
}

@Override
public void registerActivitiesImplementations(Object... activityImplementations) {
worker.registerActivitiesImplementations(activityImplementations);
}

@Override
public void registerNexusServiceImplementation(Object... nexusServiceImplementations) {
worker.registerNexusServiceImplementation(nexusServiceImplementations);
}
}
}
Loading
Loading