diff --git a/pom.xml b/pom.xml index 7239c5a2f6..16e6770ef7 100644 --- a/pom.xml +++ b/pom.xml @@ -203,6 +203,11 @@ software.amazon.awssdk sts + + + software.amazon.awssdk + s3 + com.datastax.oss java-driver-core @@ -300,6 +305,13 @@ junit-jupiter test + + + com.adobe.testing + s3mock-testcontainers + 5.1.0 + test + com.github.docker-java docker-java-api diff --git a/src/main/java/io/stargate/sgv2/jsonapi/config/BillingS3ExportConfig.java b/src/main/java/io/stargate/sgv2/jsonapi/config/BillingS3ExportConfig.java new file mode 100644 index 0000000000..4c242ca9c8 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/config/BillingS3ExportConfig.java @@ -0,0 +1,81 @@ +package io.stargate.sgv2.jsonapi.config; + +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import java.time.Duration; +import java.util.Optional; + +/** + * Configuration for exporting {@code billing.events} log lines to S3 as NDJSON {@code .jsonl} + * objects. Consumed by {@link io.stargate.sgv2.jsonapi.service.provider.BillingS3HandlerInstaller} + * which, when {@link #enabled()} is {@code true}, attaches a {@link + * io.stargate.sgv2.jsonapi.service.provider.BillingS3LogHandler} to the {@code billing.events} + * logger. + * + *

This is a startup-time switch, not a per-request feature flag — it is independent of + * {@link io.stargate.sgv2.jsonapi.config.feature.ApiFeature#BILLING_EVENTS_LOGGING}. Events only + * reach the handler when the {@code billing.events} logger is also emitting (i.e. the billing + * feature is on); this flag then decides whether those lines are additionally shipped to S3. The + * existing console handler stays attached as a backstop regardless. + * + *

Off by default. When enabled, {@link #bucket()} and {@link #bucketRegion()} are + * required; if either is missing the handler is not installed (logged as an error) and billing + * events continue to flow to the console only. + */ +@ConfigMapping(prefix = "stargate.jsonapi.billing.s3") +public interface BillingS3ExportConfig { + + /** + * Master switch; when {@code false} (default) no handler is installed and no S3 client is built. + */ + @WithDefault("false") + boolean enabled(); + + /** Target bucket, e.g. {@code serverless-usage-dev}. Required when {@link #enabled()}. */ + Optional bucket(); + + /** AWS region of the bucket, e.g. {@code us-east-1}. Required when {@link #enabled()}. */ + Optional bucketRegion(); + + /** + * Endpoint override for the S3 client. Set this to point at a non-AWS S3 (e.g. S3Mock in tests); + * when present, path-style addressing is forced. SDK resolves the regional AWS endpoint when left + * empty. + */ + Optional endpointOverride(); + + /** Seal a batch once it holds this many events. */ + @WithDefault("50") + int maxEvents(); + + /** Seal a batch once its NDJSON body reaches this many bytes (~2 MiB default). */ + @WithDefault("2097152") + long maxBytes(); + + /** Seal an open (under-filled) batch once its oldest event is this old (flush interval). */ + @WithDefault("PT30S") + Duration maxAge(); + + /** + * Capacity of the handler's in-memory hand-off queue. {@link BillingS3LogHandler#publish()} + * offers lines non-blocking; once the queue is full, further lines are dropped and counted. + */ + @WithDefault("10000") + int queueCapacity(); + + /** Maximum number of PUT attempts per sealed batch before it is counted as failed. */ + @WithDefault("3") + int maxUploadAttempts(); + + /** Base delay for exponential backoff between PUT attempts ({@code base * 2^(attempt-1)}). */ + @WithDefault("PT0.2S") + Duration retryBaseBackoff(); + + /** Jitter factor [0,1] on retry back-off (0 = none). Only applies when retryBaseBackoff > 0. */ + @WithDefault("0.5") + double retryJitter(); + + /** Number of batch uploads (S3 PUTs) allowed in flight concurrently. */ + @WithDefault("4") + int uploadConcurrency(); +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/provider/BillingS3LogHandler.java b/src/main/java/io/stargate/sgv2/jsonapi/service/provider/BillingS3LogHandler.java new file mode 100644 index 0000000000..c732bb4cf2 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/provider/BillingS3LogHandler.java @@ -0,0 +1,352 @@ +package io.stargate.sgv2.jsonapi.service.provider; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.MeterRegistry; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.BackPressureStrategy; +import io.smallrye.mutiny.subscription.Cancellable; +import io.smallrye.mutiny.subscription.MultiEmitter; +import io.stargate.sgv2.jsonapi.config.BillingS3ExportConfig; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Handler; +import java.util.logging.LogRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link Handler} that ships {@code billing.events} JSON log lines to S3 as NDJSON ({@code + * .jsonl}) objects. Installed on the {@code billing.events} logger by {@link + * BillingS3HandlerInstaller} when {@link BillingS3ExportConfig#enabled()} is {@code true}; the + * existing console handler stays attached as a backstop (dual-write). + * + *

Off the request path. {@link #publish(LogRecord)} only hands the line to an internal + * pipeline — it never blocks and never throws. The pipeline holds a bounded in-memory backlog + * ({@link BillingS3ExportConfig#queueCapacity()}) so transient bursts are absorbed and drained as + * S3 catches up; only when that backlog is full is a line dropped and counted (never silent). The + * pipeline batches lines and seals a batch on {@link BillingS3ExportConfig#maxEvents()} / {@link + * BillingS3ExportConfig#maxBytes()} / {@link BillingS3ExportConfig#maxAge()}, then PUTs it with + * bounded retry/backoff, up to {@link BillingS3ExportConfig#uploadConcurrency()} uploads in flight. + * + *

Verbatim bodies. Each log line is kept byte-for-byte as one NDJSON row — only {@code + * timestamp} is parsed out (for the key's date path); there is no re-serialization. Each sealed + * batch is one object at {@code ///

///.jsonl}; the key is built + * once and reused across retries so a retried PUT overwrites rather than duplicates (downstream + * also dedups on each event id). + * + *

{@link #close()} drains in-flight batches (bounded) and closes the uploader. The handler is + * intentionally not a CDI bean — the installer wires this instance to the {@code + * billing.events} category explicitly. + */ +public final class BillingS3LogHandler extends Handler { + + // ---- Constants ---- + // S3 object-key consistent identifier; TBD + static final String PATH_PREFIX = "billing-events"; + private static final Logger LOG = LoggerFactory.getLogger(BillingS3LogHandler.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + // UTC, minute-resolution date path for the object key + private static final DateTimeFormatter KEY_TIME_FORMAT = + DateTimeFormatter.ofPattern("yyyy/MM/dd/HH/mm").withZone(ZoneOffset.UTC); + // How long {@link #close()} waits for in-flight batches to drain before cancelling. + private static final long SHUTDOWN_DRAIN_TIMEOUT_MILLIS = 15_000L; + + // ---- Collaborators ---- + private final AsyncBatchUploader uploader; + private final BillingMetrics metrics; + + // ---- Tuning (resolved from BillingS3ExportConfig) ---- + private final int maxEvents; + private final long maxBytes; + private final long maxAgeNanos; + private final int maxUploadAttempts; + private final long retryBaseBackoffMillis; + private final double retryJitter; + private final long queueCapacity; + private final int uploadConcurrency; + + // ---- Reactive pipeline ---- + private volatile MultiEmitter emitter; + private final Cancellable subscription; + private final CountDownLatch terminated = new CountDownLatch(1); + + // ---- Mutable in-flight state ---- + /** + * Events accepted by {@link #publish} but not yet delivered or failed — the in-memory backlog + * depth and the bounded-buffer gate. {@code publish} CAS-checks it against {@link + * #queueCapacity}; {@link BillingMetrics} exposes it as the {@code billing.s3.queue.depth} gauge. + */ + private final AtomicLong backlogEvents = new AtomicLong(0); + + /** + * Current open (unsealed) batch — accumulation state carried across calls. No lock needed: the + * upstream {@code SerializedMultiEmitter} serializes onItem, so {@link #accumulate} and {@link + * #flushOpenBatch} never run concurrently. + */ + private Batch openBatch; + + /** Config-driven constructor used by the installer. */ + public BillingS3LogHandler( + BillingS3ExportConfig config, AsyncBatchUploader uploader, MeterRegistry meterRegistry) { + this( + uploader, + meterRegistry, + config.maxEvents(), + config.maxBytes(), + config.maxAge(), + config.maxUploadAttempts(), + config.retryBaseBackoff(), + config.retryJitter(), + config.queueCapacity(), + config.uploadConcurrency()); + } + + /** Explicit-threshold constructor; convenient for unit tests. */ + BillingS3LogHandler( + AsyncBatchUploader uploader, + MeterRegistry meterRegistry, + int maxEvents, + long maxBytes, + Duration maxAge, + int maxUploadAttempts, + Duration retryBaseBackoff, + double retryJitter, + int queueCapacity, + int uploadConcurrency) { + this.uploader = uploader; + this.maxEvents = Math.max(1, maxEvents); + this.maxBytes = Math.max(1L, maxBytes); + this.maxAgeNanos = Math.max(1L, maxAge.toNanos()); + this.maxUploadAttempts = Math.max(1, maxUploadAttempts); + this.retryBaseBackoffMillis = Math.max(0L, retryBaseBackoff.toMillis()); + this.retryJitter = Math.clamp(retryJitter, 0.0, 1.0); + this.queueCapacity = Math.max(1L, queueCapacity); + this.uploadConcurrency = Math.max(1, uploadConcurrency); + this.metrics = new BillingMetrics(meterRegistry, backlogEvents, this.queueCapacity); + + this.subscription = + Multi.createFrom() + .emitter(em -> this.emitter = em, BackPressureStrategy.BUFFER) + .onItem() + .transform(this::parse) + .onItem() + .transformToMultiAndConcatenate(this::accumulate) + .onCompletion() + .switchTo(this::flushOpenBatch) + .onItem() + .transformToUni(this::uploadWithRetry) + .merge(this.uploadConcurrency) + .onTermination() + .invoke(() -> terminated.countDown()) + .subscribe() + .with( + ignored -> {}, + failure -> LOG.error("Billing S3 export pipeline terminated", failure)); + } + + // ============================================================ + // java.util.logging.Handler + // ============================================================ + + @Override + public void publish(LogRecord record) { + MultiEmitter e = this.emitter; + if (record == null) { + return; + } + String line = record.getMessage(); + if (e == null || line == null || line.isEmpty()) { + return; + } + metrics.recordOffered(); + // Bounded backlog: accept while under capacity (absorbing bursts); once full, shed and count — + // never a silent drop. The CAS keeps the bound exact under concurrent publish(). + long current; + do { + current = backlogEvents.get(); + if (current >= queueCapacity) { + metrics.recordDropped(); + return; + } + } while (!backlogEvents.compareAndSet(current, current + 1)); + e.emit(line); + } + + @Override + public void flush() { + // No-op: the pipeline ships continuously; close() handles the final seal-everything on + // shutdown. + } + + @Override + public void close() { + MultiEmitter e = this.emitter; + if (e != null) { + e.complete(); + } + try { + if (!terminated.await(SHUTDOWN_DRAIN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + LOG.warn( + "Billing S3 export did not drain within {} ms on shutdown; cancelling", + SHUTDOWN_DRAIN_TIMEOUT_MILLIS); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + if (subscription != null) { + subscription.cancel(); + } + try { + uploader.close(); + } catch (Exception ex) { + LOG.warn("Error closing billing S3 uploader", ex); + } + } + + // ============================================================ + // Batching + // ============================================================ + + /** + * Sequential fold of one parsed row into {@link #openBatch}, emitting 0–2 sealed {@link Batch}es + * (sealed on {@code maxEvents}/{@code maxBytes}, or the prior batch on {@code maxAge} at the next + * arrival). Kept ordered and one-at-a-time via {@code …AndConcatenate}, not merge. + */ + private Multi accumulate(Parsed parsed) { + List sealed = new ArrayList<>(2); + Batch batch = openBatch; + if (batch != null && System.nanoTime() - batch.firstNanos >= maxAgeNanos) { + sealed.add(batch); + batch = null; + openBatch = null; + } + if (batch == null) { + batch = new Batch(parsed.timestamp()); + openBatch = batch; + } + batch.add(parsed.line()); + if (batch.events >= maxEvents || batch.bytes >= maxBytes) { + sealed.add(batch); + openBatch = null; + } + return Multi.createFrom().iterable(sealed); + } + + /** Emits the final open batch (if any) when the stream completes. */ + private Multi flushOpenBatch() { + Batch remaining = openBatch; + openBatch = null; + return remaining == null ? Multi.createFrom().empty() : Multi.createFrom().item(remaining); + } + + /** + * Uploads one sealed batch with bounded retry/backoff/jitter. Never propagates failure: a + * giving-up batch is counted and recovered to a no-op so the pipeline stays alive. + */ + private Uni uploadWithRetry(Batch batch) { + String key = objectKey(batch.firstTimestamp, UUID.randomUUID()); + byte[] body = batch.body(); + + Uni put = Uni.createFrom().completionStage(() -> uploader.upload(key, body)); + if (maxUploadAttempts > 1) { + var retry = put.onFailure().retry(); + put = + retryBaseBackoffMillis > 0 + ? retry + .withBackOff(Duration.ofMillis(retryBaseBackoffMillis)) + .withJitter(retryJitter) + .atMost(maxUploadAttempts - 1) + : retry.atMost(maxUploadAttempts - 1); + } + + return put.onItem() + .invoke( + () -> { + metrics.recordBatchDelivered(batch.events); + backlogEvents.addAndGet(-batch.events); + }) + .onFailure() + .invoke( + t -> LOG.error("Giving up on billing S3 batch '{}' ({} events)", key, batch.events, t)) + .onFailure() + .recoverWithItem( + () -> { + metrics.recordBatchFailed(batch.events); + backlogEvents.addAndGet(-batch.events); + return null; + }); + } + + /** Object key: {@code ///

///.jsonl} (UTC). */ + static String objectKey(Instant timestamp, UUID id) { + return PATH_PREFIX + "/" + KEY_TIME_FORMAT.format(timestamp) + "/" + id + ".jsonl"; + } + + /** Parses the {@code timestamp} for the key's date path; keeps the verbatim line. */ + private Parsed parse(String line) { + try { + JsonNode node = MAPPER.readTree(line); + JsonNode tsNode = node.get("timestamp"); + if (tsNode != null && tsNode.isTextual()) { + return new Parsed(line, Instant.parse(tsNode.asText())); + } + } catch (Exception e) { + // fall through to the wall-clock fallback below + } + metrics.recordParseFailure(); + return new Parsed(line, Instant.now()); + } + + private record Parsed(String line, Instant timestamp) {} + + /** A growing set of verbatim NDJSON rows. */ + private static final class Batch { + private final Instant firstTimestamp; + private final long firstNanos = System.nanoTime(); + private final List lines = new ArrayList<>(); + private int events = 0; + private long bytes = 0; + + Batch(Instant firstTimestamp) { + this.firstTimestamp = firstTimestamp; + } + + void add(String line) { + lines.add(line); + events++; + bytes += line.getBytes(StandardCharsets.UTF_8).length + 1L; // +1 for the newline + } + + byte[] body() { + StringBuilder sb = new StringBuilder((int) Math.min(Integer.MAX_VALUE, bytes + events)); + for (String line : lines) { + sb.append(line).append('\n'); + } + return sb.toString().getBytes(StandardCharsets.UTF_8); + } + } + + /** + * Single-attempt async uploader of one sealed batch (test seam); the production implementation is + * {@link S3BatchUploader}. A failed {@link CompletionStage} triggers the handler's retry/backoff. + */ + @FunctionalInterface + public interface AsyncBatchUploader extends AutoCloseable { + CompletionStage upload(String key, byte[] body); + + @Override + default void close() {} + } +}