diff --git a/temporal-sdk/src/main/java/io/temporal/payload/storage/ExternalStorage.java b/temporal-sdk/src/main/java/io/temporal/payload/storage/ExternalStorage.java new file mode 100644 index 0000000000..7ed7b07088 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/payload/storage/ExternalStorage.java @@ -0,0 +1,105 @@ +package io.temporal.payload.storage; + +import com.google.common.base.Preconditions; +import io.temporal.common.Experimental; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** Configuration for offloading large payloads to external storage. */ +@Experimental +public final class ExternalStorage { + private static final int DEFAULT_PAYLOAD_SIZE_THRESHOLD = 256 * 1024; + + public static Builder newBuilder() { + return new Builder(); + } + + private final @Nonnull List drivers; + private final @Nonnull StorageDriverSelector driverSelector; + private final int payloadSizeThreshold; + + private ExternalStorage( + @Nonnull List drivers, + @Nonnull StorageDriverSelector driverSelector, + int payloadSizeThreshold) { + this.drivers = Collections.unmodifiableList(new ArrayList<>(drivers)); + this.driverSelector = driverSelector; + this.payloadSizeThreshold = payloadSizeThreshold; + } + + @Nonnull + public List getDrivers() { + return drivers; + } + + @Nonnull + public StorageDriverSelector getDriverSelector() { + return driverSelector; + } + + /** + * Minimum payload size in bytes before external storage is considered. {@code 0} stores all + * payloads. Defaults to 256 KiB. + */ + public int getPayloadSizeThreshold() { + return payloadSizeThreshold; + } + + public static final class Builder { + private List drivers = Collections.emptyList(); + private StorageDriverSelector driverSelector; + private int payloadSizeThreshold = DEFAULT_PAYLOAD_SIZE_THRESHOLD; + + private Builder() {} + + /** At least one driver is required. When more than one is set, a selector is also required. */ + public Builder setDrivers(@Nonnull List drivers) { + this.drivers = Objects.requireNonNull(drivers, "drivers"); + return this; + } + + /** Convenience for registering a single driver; no selector is needed in this case. */ + public Builder setDriver(@Nonnull StorageDriver driver) { + return setDrivers(Collections.singletonList(Objects.requireNonNull(driver, "driver"))); + } + + /** Required when more than one driver is registered; with a single driver it is optional. */ + public Builder setDriverSelector(@Nullable StorageDriverSelector driverSelector) { + this.driverSelector = driverSelector; + return this; + } + + /** Set to {@code 0} to store all payloads. Defaults to 256 KiB. */ + public Builder setPayloadSizeThreshold(int payloadSizeThreshold) { + this.payloadSizeThreshold = payloadSizeThreshold; + return this; + } + + public ExternalStorage build() { + Preconditions.checkState(!drivers.isEmpty(), "At least one driver must be provided"); + Preconditions.checkState( + payloadSizeThreshold >= 0, "payloadSizeThreshold must be greater than or equal to zero"); + Set names = new HashSet<>(); + for (StorageDriver driver : drivers) { + String name = driver.getName(); + Preconditions.checkState( + names.add(name), "Multiple drivers registered with name '%s'", name); + } + Preconditions.checkState( + drivers.size() == 1 || driverSelector != null, + "driverSelector must be specified when more than one driver is registered"); + StorageDriverSelector selector = driverSelector; + if (selector == null) { + StorageDriver driver = drivers.get(0); + selector = (context, payload) -> driver; + } + return new ExternalStorage(drivers, selector, payloadSizeThreshold); + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriver.java b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriver.java new file mode 100644 index 0000000000..b970722e42 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriver.java @@ -0,0 +1,44 @@ +package io.temporal.payload.storage; + +import io.temporal.api.common.v1.Payload; +import io.temporal.common.Experimental; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import javax.annotation.Nonnull; + +/** Stores and retrieves payloads in an external storage system. */ +@Experimental +public interface StorageDriver { + /** + * Name of this driver instance, unique among the drivers registered in a single {@link + * ExternalStorage}. Used as the routing key recorded in a stored payload's reference and resolved + * back to this driver on retrieval. + */ + @Nonnull + String getName(); + + /** + * Stable, implementation-level identifier for this driver, the same across all instances of the + * driver class and ideally across SDKs (e.g. {@code "aws.s3driver"}). Used for metrics and worker + * heartbeat reporting, so it must not be derived from anything that changes between versions or + * refactors. + */ + @Nonnull + String getType(); + + /** + * Stores {@code payloads} and returns one {@link StorageDriverClaim} per payload, in the same + * order. The returned list must be the same length as {@code payloads}. + */ + @Nonnull + CompletableFuture> store( + @Nonnull StorageDriverStoreContext context, @Nonnull List payloads); + + /** + * Retrieves the payloads identified by {@code claims} and returns one {@link Payload} per claim, + * in the same order. The returned list must be the same length as {@code claims}. + */ + @Nonnull + CompletableFuture> retrieve( + @Nonnull StorageDriverRetrieveContext context, @Nonnull List claims); +} diff --git a/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverActivityInfo.java b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverActivityInfo.java new file mode 100644 index 0000000000..52b2d9e0d5 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverActivityInfo.java @@ -0,0 +1,76 @@ +package io.temporal.payload.storage; + +import io.temporal.common.Experimental; +import java.util.Objects; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Identity of the activity a payload is being stored on behalf of. Provided to a {@link + * StorageDriver} via {@link StorageDriverStoreContext#getTarget()}. All fields except {@code + * namespace} are best-effort and may be {@code null} when not available at store time. + */ +@Experimental +public final class StorageDriverActivityInfo implements StorageDriverTargetInfo { + private final @Nonnull String namespace; + private final @Nullable String id; + private final @Nullable String runId; + private final @Nullable String type; + + /** + * @param namespace the activity's namespace; must not be {@code null} + * @param id the activity ID, or {@code null} if not available + * @param runId the activity run ID (standalone activities), or {@code null} if not available + * @param type the activity type name, or {@code null} if not available + */ + public StorageDriverActivityInfo( + @Nonnull String namespace, + @Nullable String id, + @Nullable String runId, + @Nullable String type) { + this.namespace = Objects.requireNonNull(namespace, "namespace"); + this.id = id; + this.runId = runId; + this.type = type; + } + + @Nonnull + public String getNamespace() { + return namespace; + } + + @Nullable + public String getId() { + return id; + } + + @Nullable + public String getRunId() { + return runId; + } + + @Nullable + public String getType() { + return type; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StorageDriverActivityInfo)) { + return false; + } + StorageDriverActivityInfo that = (StorageDriverActivityInfo) o; + return namespace.equals(that.namespace) + && Objects.equals(id, that.id) + && Objects.equals(runId, that.runId) + && Objects.equals(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hash(namespace, id, runId, type); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverClaim.java b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverClaim.java new file mode 100644 index 0000000000..21a9cc6f9f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverClaim.java @@ -0,0 +1,44 @@ +package io.temporal.payload.storage; + +import io.temporal.common.Experimental; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nonnull; + +/** + * Driver-defined reference to an externally stored payload, used to retrieve it later. + * + * @see StorageDriver + */ +@Experimental +public final class StorageDriverClaim { + private final @Nonnull Map claimData; + + public StorageDriverClaim(@Nonnull Map claimData) { + this.claimData = + Collections.unmodifiableMap(new HashMap<>(Objects.requireNonNull(claimData, "claimData"))); + } + + @Nonnull + public Map getClaimData() { + return claimData; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StorageDriverClaim)) { + return false; + } + return claimData.equals(((StorageDriverClaim) o).claimData); + } + + @Override + public int hashCode() { + return claimData.hashCode(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverRetrieveContext.java b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverRetrieveContext.java new file mode 100644 index 0000000000..d83e03d578 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverRetrieveContext.java @@ -0,0 +1,7 @@ +package io.temporal.payload.storage; + +import io.temporal.common.Experimental; + +/** Context passed to {@link StorageDriver#retrieve}. */ +@Experimental +public final class StorageDriverRetrieveContext {} diff --git a/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverSelector.java b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverSelector.java new file mode 100644 index 0000000000..966e52e68d --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverSelector.java @@ -0,0 +1,18 @@ +package io.temporal.payload.storage; + +import io.temporal.api.common.v1.Payload; +import io.temporal.common.Experimental; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** Chooses which {@link StorageDriver} stores a given payload. */ +@Experimental +@FunctionalInterface +public interface StorageDriverSelector { + /** + * Returns the driver to store {@code payload}, which must be one of the drivers registered in the + * {@link ExternalStorage}, or {@code null} to leave the payload stored inline. + */ + @Nullable + StorageDriver selectDriver(@Nonnull StorageDriverStoreContext context, @Nonnull Payload payload); +} diff --git a/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverStoreContext.java b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverStoreContext.java new file mode 100644 index 0000000000..732a7de08a --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverStoreContext.java @@ -0,0 +1,19 @@ +package io.temporal.payload.storage; + +import io.temporal.common.Experimental; +import javax.annotation.Nullable; + +/** Context passed to {@link StorageDriver#store} and {@link StorageDriverSelector}. */ +@Experimental +public final class StorageDriverStoreContext { + private final @Nullable StorageDriverTargetInfo target; + + public StorageDriverStoreContext(@Nullable StorageDriverTargetInfo target) { + this.target = target; + } + + @Nullable + public StorageDriverTargetInfo getTarget() { + return target; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverTargetInfo.java b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverTargetInfo.java new file mode 100644 index 0000000000..f70bc08ed1 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverTargetInfo.java @@ -0,0 +1,11 @@ +package io.temporal.payload.storage; + +import io.temporal.common.Experimental; + +/** + * Identity of the workflow or activity a payload is being stored on behalf of. Provided on a + * best-effort basis on the storing side only; some fields may be absent. Implemented by {@link + * StorageDriverWorkflowInfo} and {@link StorageDriverActivityInfo}. + */ +@Experimental +public interface StorageDriverTargetInfo {} diff --git a/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverWorkflowInfo.java b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverWorkflowInfo.java new file mode 100644 index 0000000000..455025a8b0 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverWorkflowInfo.java @@ -0,0 +1,76 @@ +package io.temporal.payload.storage; + +import io.temporal.common.Experimental; +import java.util.Objects; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Identity of the workflow a payload is being stored on behalf of. Provided to a {@link + * StorageDriver} via {@link StorageDriverStoreContext#getTarget()}. All fields except {@code + * namespace} are best-effort and may be {@code null} when not available at store time. + */ +@Experimental +public final class StorageDriverWorkflowInfo implements StorageDriverTargetInfo { + private final @Nonnull String namespace; + private final @Nullable String id; + private final @Nullable String runId; + private final @Nullable String type; + + /** + * @param namespace the workflow's namespace; must not be {@code null} + * @param id the workflow ID, or {@code null} if not available + * @param runId the workflow run ID, or {@code null} if not available + * @param type the workflow type name, or {@code null} if not available + */ + public StorageDriverWorkflowInfo( + @Nonnull String namespace, + @Nullable String id, + @Nullable String runId, + @Nullable String type) { + this.namespace = Objects.requireNonNull(namespace, "namespace"); + this.id = id; + this.runId = runId; + this.type = type; + } + + @Nonnull + public String getNamespace() { + return namespace; + } + + @Nullable + public String getId() { + return id; + } + + @Nullable + public String getRunId() { + return runId; + } + + @Nullable + public String getType() { + return type; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StorageDriverWorkflowInfo)) { + return false; + } + StorageDriverWorkflowInfo that = (StorageDriverWorkflowInfo) o; + return namespace.equals(that.namespace) + && Objects.equals(id, that.id) + && Objects.equals(runId, that.runId) + && Objects.equals(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hash(namespace, id, runId, type); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/payload/storage/ExternalStorageTest.java b/temporal-sdk/src/test/java/io/temporal/payload/storage/ExternalStorageTest.java new file mode 100644 index 0000000000..efe81f6427 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/payload/storage/ExternalStorageTest.java @@ -0,0 +1,98 @@ +package io.temporal.payload.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; + +import io.temporal.api.common.v1.Payload; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.junit.Test; + +public class ExternalStorageTest { + + private static StorageDriver driver(String name) { + return new StorageDriver() { + @Override + public String getName() { + return name; + } + + @Override + public String getType() { + return "test"; + } + + @Override + public CompletableFuture> store( + StorageDriverStoreContext context, List payloads) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture> retrieve( + StorageDriverRetrieveContext context, List claims) { + throw new UnsupportedOperationException(); + } + }; + } + + @Test + public void singleDriverNoSelectorSynthesizesSelector() { + StorageDriver a = driver("a"); + ExternalStorage storage = ExternalStorage.newBuilder().setDriver(a).build(); + assertEquals(1, storage.getDrivers().size()); + StorageDriverSelector selector = storage.getDriverSelector(); + assertNotNull(selector); + assertSame( + a, + selector.selectDriver(new StorageDriverStoreContext(null), Payload.getDefaultInstance())); + } + + @Test + public void multipleDriversWithSelectorIsValid() { + StorageDriver a = driver("a"); + ExternalStorage storage = + ExternalStorage.newBuilder() + .setDrivers(Arrays.asList(a, driver("b"))) + .setDriverSelector((context, payload) -> a) + .build(); + assertEquals(2, storage.getDrivers().size()); + assertNotNull(storage.getDriverSelector()); + } + + @Test + public void zeroThresholdStoresAll() { + ExternalStorage storage = + ExternalStorage.newBuilder() + .setDrivers(Collections.singletonList(driver("a"))) + .setPayloadSizeThreshold(0) + .build(); + assertEquals(0, storage.getPayloadSizeThreshold()); + } + + @Test(expected = IllegalStateException.class) + public void noDriversRejected() { + ExternalStorage.newBuilder().build(); + } + + @Test(expected = IllegalStateException.class) + public void duplicateDriverNamesRejected() { + ExternalStorage.newBuilder().setDrivers(Arrays.asList(driver("dup"), driver("dup"))).build(); + } + + @Test(expected = IllegalStateException.class) + public void multipleDriversRequireSelector() { + ExternalStorage.newBuilder().setDrivers(Arrays.asList(driver("a"), driver("b"))).build(); + } + + @Test(expected = IllegalStateException.class) + public void negativeThresholdRejected() { + ExternalStorage.newBuilder() + .setDrivers(Collections.singletonList(driver("a"))) + .setPayloadSizeThreshold(-1) + .build(); + } +}