From 0f8bf4005fbd73ee8c7267c2b86e96637528aa13 Mon Sep 17 00:00:00 2001
From: Ahmed Muhsin
Date: Wed, 3 Jun 2026 14:16:08 -0500
Subject: [PATCH 01/10] Add HttpProxyServer + Coordinator infrastructure
Introduce a new com.microsoft.azure.functions.worker.http package that lays the foundation for receiving HTTP-triggered invocations directly from the Functions host via the HttpUri capability (parity with the Go, Python, and .NET isolated workers).
* HttpProxyServer wraps the JDK built-in com.sun.net.httpserver.HttpServer. Binds to 127.0.0.1 on an ephemeral port, dispatches requests on a cached executor mirroring the existing gRPC dispatch pool (unbounded growth, daemon threads, 15s shutdown drain). No new third-party dependencies.
* HttpInvocationCoordinator synchronizes the two halves of each invocation - the HTTP request and the gRPC InvocationRequest - via a ConcurrentMap of per-invocation slots keyed by invocationId. Each slot exposes CompletableFutures for HTTP arrival, gRPC arrival, and completion.
* ProxyConfig captures bind address/port only. No body-size or timeout caps; matches Go/Python/.NET behavior of relying on the platform for those guards.
Pure infrastructure commit: nothing is wired into the worker lifecycle yet. 15 new unit tests cover both rendezvous orderings, slot release/failure semantics, ephemeral port binding, request routing, and close-after-start behavior.
---
.../http/HttpInvocationCoordinator.java | 103 +++++++++++++
.../worker/http/HttpInvocationSlot.java | 52 +++++++
.../worker/http/HttpProxyServer.java | 126 ++++++++++++++++
.../functions/worker/http/ProxyConfig.java | 43 ++++++
.../http/HttpInvocationCoordinatorTest.java | 135 ++++++++++++++++++
.../worker/http/HttpProxyServerTest.java | 115 +++++++++++++++
6 files changed, 574 insertions(+)
create mode 100644 src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java
create mode 100644 src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java
create mode 100644 src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyServer.java
create mode 100644 src/main/java/com/microsoft/azure/functions/worker/http/ProxyConfig.java
create mode 100644 src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java
create mode 100644 src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyServerTest.java
diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java
new file mode 100644
index 0000000..fbe47de
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java
@@ -0,0 +1,103 @@
+package com.microsoft.azure.functions.worker.http;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.microsoft.azure.functions.rpc.messages.InvocationRequest;
+import com.sun.net.httpserver.HttpExchange;
+
+/**
+ * Synchronizes the two halves of an HTTP-proxied invocation by invocation id.
+ *
+ *
The Functions host dispatches each HTTP-triggered invocation as two
+ * messages that arrive on independent channels:
+ *
+ *
An HTTP request forwarded to the worker's embedded proxy server
+ * (delivered to a worker thread inside the JDK HttpServer pool).
+ *
A gRPC {@code InvocationRequest} carrying trigger metadata and
+ * the matching {@code invocationId}.
+ *
+ *
+ *
This coordinator owns the per-invocation slot, exposes rendezvous methods
+ * that block until the other half arrives, and releases the slot when the
+ * invocation completes. Slot lookup and creation are atomic so the two halves
+ * can race without losing one another.
+ *
+ *
The coordinator does not impose timeouts: the host owns end-to-end
+ * timeout enforcement, and per-invocation hangs are observable via the worker's
+ * existing health and watchdog telemetry.
+ */
+public final class HttpInvocationCoordinator {
+ private final ConcurrentMap slots = new ConcurrentHashMap<>();
+
+ /**
+ * Registers the arrival of an HTTP request for the given invocation.
+ * If the matching gRPC dispatch has not yet arrived, the returned future
+ * resolves once it does. Called by the HTTP proxy handler.
+ */
+ public CompletableFuture registerHttpArrival(String invocationId, HttpExchange exchange) {
+ Objects.requireNonNull(invocationId, "invocationId");
+ Objects.requireNonNull(exchange, "exchange");
+ HttpInvocationSlot slot = slots.computeIfAbsent(invocationId, HttpInvocationSlot::new);
+ if (!slot.httpArrival().complete(exchange)) {
+ throw new IllegalStateException(
+ "HTTP arrival already registered for invocation " + invocationId);
+ }
+ return slot.grpcArrival();
+ }
+
+ /**
+ * Registers the arrival of a gRPC InvocationRequest for the given invocation.
+ * If the matching HTTP request has not yet arrived, the returned future
+ * resolves once it does. Called by the gRPC invocation dispatcher.
+ */
+ public CompletableFuture registerGrpcArrival(InvocationRequest request) {
+ Objects.requireNonNull(request, "request");
+ String invocationId = request.getInvocationId();
+ HttpInvocationSlot slot = slots.computeIfAbsent(invocationId, HttpInvocationSlot::new);
+ if (!slot.grpcArrival().complete(request)) {
+ throw new IllegalStateException(
+ "gRPC arrival already registered for invocation " + invocationId);
+ }
+ return slot.httpArrival();
+ }
+
+ /**
+ * Marks the invocation as complete and removes its slot. Idempotent. Any
+ * outstanding rendezvous futures are cancelled to unblock callers.
+ */
+ public void releaseInvocation(String invocationId) {
+ Objects.requireNonNull(invocationId, "invocationId");
+ HttpInvocationSlot slot = slots.remove(invocationId);
+ if (slot == null) {
+ return;
+ }
+ slot.httpArrival().cancel(false);
+ slot.grpcArrival().cancel(false);
+ slot.completion().complete(null);
+ }
+
+ /**
+ * Fails the invocation slot with the given throwable. Used when the worker
+ * decides to abort an in-flight invocation (e.g., HTTP handler exception
+ * before the user function runs). The slot is removed after failure.
+ */
+ public void failInvocation(String invocationId, Throwable cause) {
+ Objects.requireNonNull(invocationId, "invocationId");
+ Objects.requireNonNull(cause, "cause");
+ HttpInvocationSlot slot = slots.remove(invocationId);
+ if (slot == null) {
+ return;
+ }
+ slot.httpArrival().completeExceptionally(cause);
+ slot.grpcArrival().completeExceptionally(cause);
+ slot.completion().completeExceptionally(cause);
+ }
+
+ /** Visible for tests. */
+ int activeInvocationCount() {
+ return slots.size();
+ }
+}
diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java
new file mode 100644
index 0000000..7a5e309
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java
@@ -0,0 +1,52 @@
+package com.microsoft.azure.functions.worker.http;
+
+import java.util.concurrent.CompletableFuture;
+
+import com.microsoft.azure.functions.rpc.messages.InvocationRequest;
+import com.sun.net.httpserver.HttpExchange;
+
+/**
+ * Holds the rendezvous state for a single in-flight HTTP-proxied invocation.
+ *
+ *
The Functions host delivers an invocation along two independent paths:
+ *
+ *
An HTTP request forwarded to the worker's proxy server, carrying the
+ * request body and headers.
+ *
A gRPC {@code InvocationRequest} carrying trigger metadata, route
+ * parameters, and the {@code invocationId} used to correlate the two.
+ *
+ *
+ *
Either side may arrive first. The slot exposes futures that the HTTP
+ * handler thread and the gRPC dispatcher thread wait on. The {@code completion}
+ * future is signaled once the invocation has fully responded, allowing the slot
+ * to be released from the coordinator's map.
+ *
+ *
Instances are package-private; use {@link HttpInvocationCoordinator} to
+ * acquire and release slots.
Backed by {@link com.sun.net.httpserver.HttpServer}, a JDK built-in
+ * since Java 6, so the worker takes on no new runtime dependencies.
+ *
+ *
The server binds to the loopback address on an ephemeral port and is
+ * started by {@link #start(HttpHandler)} with a single root handler.
+ * Worker threads come from a cached executor that mirrors the gRPC dispatch
+ * pool: unbounded growth, named for diagnostics, 15 s drain on shutdown.
+ * Capping concurrency is left to the platform, matching the Go, Python, and
+ * .NET isolated workers.
+ */
+public final class HttpProxyServer implements AutoCloseable {
+ private static final long EXECUTOR_SHUTDOWN_SECONDS = 15L;
+ private static final long SERVER_STOP_SECONDS = 5L;
+
+ private final ProxyConfig config;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ private HttpServer server;
+ private ExecutorService executor;
+ private String boundUri;
+
+ public HttpProxyServer(ProxyConfig config) {
+ this.config = Objects.requireNonNull(config, "config");
+ }
+
+ /**
+ * Binds the server, attaches {@code rootHandler} to {@code "/"}, and starts
+ * serving requests. Returns the absolute {@code http://host:port} URI that
+ * should be advertised to the Functions host via the {@code HttpUri}
+ * capability.
+ *
+ * @throws IllegalStateException if start has already been called
+ * @throws IOException if the server cannot bind
+ */
+ public synchronized String start(HttpHandler rootHandler) throws IOException {
+ Objects.requireNonNull(rootHandler, "rootHandler");
+ if (!started.compareAndSet(false, true)) {
+ throw new IllegalStateException("HttpProxyServer already started");
+ }
+ InetSocketAddress bindAddress = new InetSocketAddress(
+ config.getBindAddress(), config.getBindPort());
+ // Backlog 0 → JDK default.
+ this.server = HttpServer.create(bindAddress, 0);
+ this.executor = Executors.newCachedThreadPool(new ProxyThreadFactory());
+ this.server.setExecutor(this.executor);
+ this.server.createContext("/", rootHandler);
+ this.server.start();
+ InetSocketAddress actual = this.server.getAddress();
+ this.boundUri = "http://" + actual.getHostString() + ":" + actual.getPort();
+ WorkerLogManager.getSystemLogger().log(Level.INFO,
+ "HTTP proxy server bound to " + boundUri);
+ return boundUri;
+ }
+
+ /**
+ * Returns the URI the server is listening on, or {@code null} if the
+ * server has not been started.
+ */
+ public String getBoundUri() {
+ return boundUri;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!started.compareAndSet(true, false)) {
+ return;
+ }
+ if (server != null) {
+ try {
+ // Allow in-flight requests up to SERVER_STOP_SECONDS to drain.
+ server.stop((int) SERVER_STOP_SECONDS);
+ } catch (RuntimeException ex) {
+ WorkerLogManager.getSystemLogger().log(Level.WARNING,
+ "Error stopping HTTP proxy server", ex);
+ }
+ server = null;
+ }
+ if (executor != null) {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(EXECUTOR_SHUTDOWN_SECONDS, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException ex) {
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ executor = null;
+ }
+ boundUri = null;
+ }
+
+ /**
+ * Thread factory that names worker threads for diagnostics. Daemon threads
+ * so they do not block JVM shutdown if the server is not explicitly closed.
+ */
+ private static final class ProxyThreadFactory implements java.util.concurrent.ThreadFactory {
+ private final java.util.concurrent.atomic.AtomicInteger counter = new java.util.concurrent.atomic.AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "functions-http-proxy-" + counter.incrementAndGet());
+ t.setDaemon(true);
+ return t;
+ }
+ }
+}
diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/ProxyConfig.java b/src/main/java/com/microsoft/azure/functions/worker/http/ProxyConfig.java
new file mode 100644
index 0000000..78f31d6
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/functions/worker/http/ProxyConfig.java
@@ -0,0 +1,43 @@
+package com.microsoft.azure.functions.worker.http;
+
+import java.util.Objects;
+
+/**
+ * Configuration for the embedded HTTP proxy server used to receive HTTP-triggered
+ * invocations directly from the Functions host (HttpUri capability).
+ *
+ *
The configuration deliberately does not impose request body size limits or
+ * per-request timeouts. The Functions front-end (nginx) enforces an upstream
+ * ceiling, and per-worker overload is managed by the platform — matching the
+ * behavior of the Go, Python, and .NET isolated workers.
+ */
+public final class ProxyConfig {
+ /** Loopback bind address. Other workers also bind to 127.0.0.1 only. */
+ public static final String DEFAULT_BIND_ADDRESS = "127.0.0.1";
+
+ /** Ephemeral port. The OS picks an unused port at bind time. */
+ public static final int DEFAULT_BIND_PORT = 0;
+
+ private final String bindAddress;
+ private final int bindPort;
+
+ public ProxyConfig(String bindAddress, int bindPort) {
+ this.bindAddress = Objects.requireNonNull(bindAddress, "bindAddress");
+ if (bindPort < 0 || bindPort > 65535) {
+ throw new IllegalArgumentException("bindPort out of range: " + bindPort);
+ }
+ this.bindPort = bindPort;
+ }
+
+ public static ProxyConfig defaults() {
+ return new ProxyConfig(DEFAULT_BIND_ADDRESS, DEFAULT_BIND_PORT);
+ }
+
+ public String getBindAddress() {
+ return bindAddress;
+ }
+
+ public int getBindPort() {
+ return bindPort;
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java
new file mode 100644
index 0000000..06a6318
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java
@@ -0,0 +1,135 @@
+package com.microsoft.azure.functions.worker.http;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.microsoft.azure.functions.rpc.messages.InvocationRequest;
+import com.sun.net.httpserver.HttpExchange;
+
+import org.junit.jupiter.api.Test;
+
+public class HttpInvocationCoordinatorTest {
+
+ private static final String INVOCATION_ID = "abc-123";
+
+ @Test
+ public void httpArrivesBeforeGrpc() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
+
+ CompletableFuture grpcFuture = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertFalse(grpcFuture.isDone(), "gRPC future should still be pending before gRPC arrival");
+
+ CompletableFuture httpFuture = coordinator.registerGrpcArrival(request);
+ assertTrue(httpFuture.isDone(), "HTTP future should already be resolved once gRPC arrives");
+ assertSame(exchange, httpFuture.get(1, TimeUnit.SECONDS));
+ assertSame(request, grpcFuture.get(1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void grpcArrivesBeforeHttp() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
+
+ CompletableFuture httpFuture = coordinator.registerGrpcArrival(request);
+ assertFalse(httpFuture.isDone(), "HTTP future should still be pending before HTTP arrival");
+
+ CompletableFuture grpcFuture = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertTrue(grpcFuture.isDone(), "gRPC future should already be resolved once HTTP arrives");
+ assertSame(exchange, httpFuture.get(1, TimeUnit.SECONDS));
+ assertSame(request, grpcFuture.get(1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void releaseInvocationRemovesSlot() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertEquals(1, coordinator.activeInvocationCount());
+
+ coordinator.releaseInvocation(INVOCATION_ID);
+ assertEquals(0, coordinator.activeInvocationCount());
+ }
+
+ @Test
+ public void releaseInvocationIsIdempotent() {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ // Releasing an unknown invocation does not throw.
+ coordinator.releaseInvocation("unknown");
+ coordinator.releaseInvocation("unknown");
+ }
+
+ @Test
+ public void failInvocationPropagatesToFutures() {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ CompletableFuture grpcFuture = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+
+ IOException cause = new IOException("boom");
+ coordinator.failInvocation(INVOCATION_ID, cause);
+
+ ExecutionException ex = assertThrows(ExecutionException.class,
+ () -> grpcFuture.get(1, TimeUnit.SECONDS));
+ assertSame(cause, ex.getCause());
+ assertEquals(0, coordinator.activeInvocationCount());
+ }
+
+ @Test
+ public void duplicateHttpArrivalThrows() {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertThrows(IllegalStateException.class,
+ () -> coordinator.registerHttpArrival(INVOCATION_ID, exchange));
+ }
+
+ @Test
+ public void duplicateGrpcArrivalThrows() {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
+ coordinator.registerGrpcArrival(request);
+ assertThrows(IllegalStateException.class,
+ () -> coordinator.registerGrpcArrival(request));
+ }
+
+ @Test
+ public void independentInvocationsDoNotInterfere() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchangeA = mock(HttpExchange.class);
+ HttpExchange exchangeB = mock(HttpExchange.class);
+ InvocationRequest reqA = InvocationRequest.newBuilder().setInvocationId("a").build();
+ InvocationRequest reqB = InvocationRequest.newBuilder().setInvocationId("b").build();
+
+ CompletableFuture grpcA = coordinator.registerHttpArrival("a", exchangeA);
+ CompletableFuture grpcB = coordinator.registerHttpArrival("b", exchangeB);
+ // Resolve only A; B must still be pending.
+ coordinator.registerGrpcArrival(reqA);
+ assertTrue(grpcA.isDone());
+ assertFalse(grpcB.isDone());
+
+ coordinator.registerGrpcArrival(reqB);
+ assertSame(reqA, grpcA.get(1, TimeUnit.SECONDS));
+ assertSame(reqB, grpcB.get(1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void grpcFutureRemainsPendingUntilHttpArrives() {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
+ CompletableFuture httpFuture = coordinator.registerGrpcArrival(request);
+ // No HTTP arrival; future must time out.
+ assertThrows(TimeoutException.class, () -> httpFuture.get(50, TimeUnit.MILLISECONDS));
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyServerTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyServerTest.java
new file mode 100644
index 0000000..f9f04a3
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyServerTest.java
@@ -0,0 +1,115 @@
+package com.microsoft.azure.functions.worker.http;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.sun.net.httpserver.HttpHandler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+public class HttpProxyServerTest {
+
+ private HttpProxyServer server;
+
+ @AfterEach
+ public void tearDown() {
+ if (server != null) {
+ server.close();
+ server = null;
+ }
+ }
+
+ @Test
+ public void startBindsToEphemeralPortAndReturnsUri() throws Exception {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ String uri = server.start(noOpHandler());
+ assertNotNull(uri);
+ assertTrue(uri.startsWith("http://127.0.0.1:"), "Expected loopback URI, got " + uri);
+ URI parsed = URI.create(uri);
+ assertTrue(parsed.getPort() > 0, "Expected a real port number, got " + parsed.getPort());
+ assertEquals(uri, server.getBoundUri());
+ }
+
+ @Test
+ public void getBoundUriReturnsNullBeforeStart() {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ assertNull(server.getBoundUri());
+ }
+
+ @Test
+ public void closeBeforeStartIsNoop() {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ server.close();
+ assertNull(server.getBoundUri());
+ }
+
+ @Test
+ public void doubleStartThrows() throws Exception {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ server.start(noOpHandler());
+ assertThrows(IllegalStateException.class, () -> server.start(noOpHandler()));
+ }
+
+ @Test
+ public void routesIncomingRequestToHandler() throws Exception {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ AtomicReference seenPath = new AtomicReference<>();
+ AtomicReference seenHeader = new AtomicReference<>();
+ String uri = server.start(exchange -> {
+ seenPath.set(exchange.getRequestURI().getPath());
+ seenHeader.set(exchange.getRequestHeaders().getFirst("x-ms-invocation-id"));
+ byte[] body = "hello".getBytes(StandardCharsets.UTF_8);
+ exchange.sendResponseHeaders(200, body.length);
+ try (OutputStream os = exchange.getResponseBody()) {
+ os.write(body);
+ }
+ });
+
+ HttpURLConnection conn = (HttpURLConnection) URI.create(uri + "/some/route").toURL().openConnection();
+ conn.setRequestProperty("x-ms-invocation-id", "test-123");
+ conn.connect();
+ try {
+ assertEquals(200, conn.getResponseCode());
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
+ assertEquals("hello", reader.readLine());
+ }
+ } finally {
+ conn.disconnect();
+ }
+ assertEquals("/some/route", seenPath.get());
+ assertEquals("test-123", seenHeader.get());
+ }
+
+ @Test
+ public void closeStopsAcceptingConnections() throws Exception {
+ server = new HttpProxyServer(ProxyConfig.defaults());
+ String uri = server.start(noOpHandler());
+ server.close();
+ server = null;
+ HttpURLConnection conn = (HttpURLConnection) URI.create(uri + "/").toURL().openConnection();
+ conn.setConnectTimeout(500);
+ conn.setReadTimeout(500);
+ // After close, the next connect attempt must fail (connection refused).
+ assertThrows(Exception.class, conn::connect);
+ }
+
+ private static HttpHandler noOpHandler() {
+ return exchange -> {
+ exchange.sendResponseHeaders(204, -1);
+ exchange.close();
+ };
+ }
+}
From b0bd471d036b3f6d83f7b2e9e81d7e550fb2b308 Mon Sep 17 00:00:00 2001
From: Ahmed Muhsin
Date: Wed, 3 Jun 2026 14:34:35 -0500
Subject: [PATCH 02/10] Bridge HTTP proxy traffic to gRPC invocation pipeline
Adds HttpProxyHandler and HttpBodyBridge to connect the embedded HTTP
proxy server to the existing gRPC invocation dispatch path.
HttpProxyHandler parks each forwarded request on
HttpInvocationCoordinator (keyed by x-ms-invocation-id), then blocks on
the slot's completion future so the connection stays open until the
gRPC side has fully written the response. Missing header / duplicate
arrival / invocation failure paths return appropriate HTTP error
responses.
HttpBodyBridge handles the request body <-> protobuf body conversion
and writes RpcHttp responses back to HttpExchange. Body classification
mirrors the host's PopulateBody behavior (application/json -> JSON
TypedData, text/form-encoded/xml/js -> string, otherwise bytes) so
downstream RpcHttpRequestDataSource logic continues to work unchanged.
API refactor: HttpInvocationCoordinator.register*Arrival() now returns
the HttpInvocationSlot itself instead of the opposite side's future,
because the HTTP handler needs to await completion() (not grpcArrival).
HttpInvocationSlot is now public to expose this surface.
Tests:
- HttpBodyBridgeTest: 15 tests covering content-type classification,
charset handling, body enrichment, response writing, and status
parsing.
- HttpProxyHandlerTest: 4 tests covering missing-header,
successful-completion, invocation-failure, and duplicate-arrival
paths.
- HttpInvocationCoordinatorTest: updated for the new slot-returning
API; added completionFutureResolvesOnRelease.
No worker wiring yet -- the proxy server is not started and no
capability is advertised. This change just stages the parts that the
next commit will glue into JavaWorkerClient + the request handlers.
---
.../functions/worker/http/HttpBodyBridge.java | 226 +++++++++++++++++
.../http/HttpInvocationCoordinator.java | 21 +-
.../worker/http/HttpInvocationSlot.java | 34 ++-
.../worker/http/HttpProxyHandler.java | 101 ++++++++
.../worker/http/HttpBodyBridgeTest.java | 230 ++++++++++++++++++
.../http/HttpInvocationCoordinatorTest.java | 60 +++--
.../worker/http/HttpProxyHandlerTest.java | 129 ++++++++++
7 files changed, 759 insertions(+), 42 deletions(-)
create mode 100644 src/main/java/com/microsoft/azure/functions/worker/http/HttpBodyBridge.java
create mode 100644 src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyHandler.java
create mode 100644 src/test/java/com/microsoft/azure/functions/worker/http/HttpBodyBridgeTest.java
create mode 100644 src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyHandlerTest.java
diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpBodyBridge.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpBodyBridge.java
new file mode 100644
index 0000000..1ca9da7
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpBodyBridge.java
@@ -0,0 +1,226 @@
+package com.microsoft.azure.functions.worker.http;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import com.google.protobuf.ByteString;
+import com.microsoft.azure.functions.rpc.messages.InvocationRequest;
+import com.microsoft.azure.functions.rpc.messages.ParameterBinding;
+import com.microsoft.azure.functions.rpc.messages.RpcHttp;
+import com.microsoft.azure.functions.rpc.messages.TypedData;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+
+/**
+ * Bridges between the JDK {@link HttpExchange} surface and the Functions
+ * worker's protobuf-based binding plumbing.
+ *
+ *
For HTTP-proxied invocations, the Functions host sends the request body
+ * and headers over HTTP (to the worker's embedded proxy server) and the trigger
+ * metadata over gRPC (in an {@link InvocationRequest} whose HTTP input has an
+ * empty body). This class:
+ *
+ *
Reads the body off the {@code HttpExchange} and folds it into the
+ * {@code RpcHttp} payload that downstream binding code expects.
+ *
Writes the {@code RpcHttp} response produced by the user function back
+ * to the {@code HttpExchange}.
+ *
+ *
+ *
Body classification mirrors the host's {@code PopulateBody} logic and the
+ * existing worker behavior for in-process bodies: {@code application/json} →
+ * {@code TypedData.json}; {@code text/*} and form-encoded → {@code TypedData.string};
+ * everything else (including absent {@code Content-Type}) → {@code TypedData.bytes}.
+ */
+public final class HttpBodyBridge {
+ private static final String CONTENT_TYPE_HEADER = "Content-Type";
+ private static final int READ_CHUNK = 8192;
+
+ private HttpBodyBridge() {
+ }
+
+ /**
+ * Returns a copy of {@code request} with the body of its HTTP input replaced
+ * by the body read from {@code exchange}. If no input parameter holds an
+ * {@code RpcHttp} payload, {@code request} is returned unchanged.
+ *
+ *
The body is read eagerly into memory. Streaming support (InputStream
+ * as a parameter type) is layered on in a later commit and bypasses this
+ * eager read.
+ */
+ public static InvocationRequest enrichRequestWithBody(InvocationRequest request, HttpExchange exchange)
+ throws IOException {
+ InvocationRequest.Builder builder = request.toBuilder();
+ List inputs = request.getInputDataList();
+ boolean enriched = false;
+ byte[] body = null;
+ for (int i = 0; i < inputs.size(); i++) {
+ ParameterBinding input = inputs.get(i);
+ if (!input.getData().hasHttp()) {
+ continue;
+ }
+ if (body == null) {
+ body = readBody(exchange);
+ }
+ RpcHttp.Builder httpBuilder = input.getData().getHttp().toBuilder();
+ httpBuilder.setBody(buildBodyTypedData(body, contentType(exchange.getRequestHeaders())));
+ TypedData.Builder dataBuilder = input.getData().toBuilder().setHttp(httpBuilder);
+ ParameterBinding patched = input.toBuilder().setData(dataBuilder).build();
+ builder.setInputData(i, patched);
+ enriched = true;
+ }
+ return enriched ? builder.build() : request;
+ }
+
+ /**
+ * Writes an {@link RpcHttp} response (status + headers + body) to the
+ * given {@link HttpExchange}. The exchange is left open for the caller
+ * to close.
+ */
+ public static void writeRpcHttpResponse(HttpExchange exchange, RpcHttp response) throws IOException {
+ int status = parseStatus(response.getStatusCode());
+ for (Map.Entry header : response.getHeadersMap().entrySet()) {
+ exchange.getResponseHeaders().add(header.getKey(), header.getValue());
+ }
+ byte[] bodyBytes = extractBodyBytes(response.getBody(), response.getHeadersMap());
+ if (bodyBytes.length == 0) {
+ // -1 == no response body; the response body stream does not need to be opened.
+ exchange.sendResponseHeaders(status, -1);
+ } else {
+ exchange.sendResponseHeaders(status, bodyBytes.length);
+ try (OutputStream os = exchange.getResponseBody()) {
+ os.write(bodyBytes);
+ }
+ }
+ }
+
+ /**
+ * Writes a plain text error response to the exchange. Used by the proxy
+ * handler when it cannot wire up an invocation (missing header, lost
+ * coordinator, etc.).
+ */
+ public static void writeErrorResponse(HttpExchange exchange, int status, String message) throws IOException {
+ byte[] body = (message == null ? "" : message).getBytes(StandardCharsets.UTF_8);
+ exchange.getResponseHeaders().set("Content-Type", "text/plain; charset=utf-8");
+ exchange.sendResponseHeaders(status, body.length);
+ try (OutputStream os = exchange.getResponseBody()) {
+ os.write(body);
+ }
+ }
+
+ static TypedData buildBodyTypedData(byte[] bytes, String contentType) {
+ if (contentType != null) {
+ String normalized = contentType.toLowerCase(Locale.ROOT);
+ if (normalized.startsWith("application/json")) {
+ return TypedData.newBuilder()
+ .setJson(new String(bytes, charsetFromContentType(contentType)))
+ .build();
+ }
+ if (normalized.startsWith("text/")
+ || normalized.startsWith("application/x-www-form-urlencoded")
+ || normalized.startsWith("application/xml")
+ || normalized.startsWith("application/javascript")) {
+ return TypedData.newBuilder()
+ .setString(new String(bytes, charsetFromContentType(contentType)))
+ .build();
+ }
+ }
+ return TypedData.newBuilder()
+ .setBytes(ByteString.copyFrom(bytes))
+ .build();
+ }
+
+ private static byte[] readBody(HttpExchange exchange) throws IOException {
+ try (InputStream in = exchange.getRequestBody()) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] chunk = new byte[READ_CHUNK];
+ int n;
+ while ((n = in.read(chunk)) != -1) {
+ out.write(chunk, 0, n);
+ }
+ return out.toByteArray();
+ }
+ }
+
+ private static String contentType(Headers headers) {
+ return headers.getFirst(CONTENT_TYPE_HEADER);
+ }
+
+ private static Charset charsetFromContentType(String contentType) {
+ if (contentType == null) {
+ return StandardCharsets.UTF_8;
+ }
+ int idx = contentType.toLowerCase(Locale.ROOT).indexOf("charset=");
+ if (idx < 0) {
+ return StandardCharsets.UTF_8;
+ }
+ String charset = contentType.substring(idx + "charset=".length()).trim();
+ int semi = charset.indexOf(';');
+ if (semi >= 0) {
+ charset = charset.substring(0, semi).trim();
+ }
+ // strip enclosing quotes
+ if (charset.length() >= 2
+ && charset.charAt(0) == '"'
+ && charset.charAt(charset.length() - 1) == '"') {
+ charset = charset.substring(1, charset.length() - 1);
+ }
+ try {
+ return Charset.forName(charset);
+ } catch (RuntimeException ex) {
+ return StandardCharsets.UTF_8;
+ }
+ }
+
+ private static int parseStatus(String statusCode) {
+ if (statusCode == null || statusCode.isEmpty()) {
+ return 200;
+ }
+ try {
+ int status = Integer.parseInt(statusCode);
+ if (status < 100 || status > 599) {
+ return 500;
+ }
+ return status;
+ } catch (NumberFormatException ex) {
+ return 500;
+ }
+ }
+
+ private static byte[] extractBodyBytes(TypedData body, Map headers) {
+ if (body == null) {
+ return new byte[0];
+ }
+ switch (body.getDataCase()) {
+ case BYTES:
+ return body.getBytes().toByteArray();
+ case STRING:
+ return body.getString().getBytes(charsetFromContentType(headerLookup(headers, "Content-Type")));
+ case JSON:
+ return body.getJson().getBytes(StandardCharsets.UTF_8);
+ case DATA_NOT_SET:
+ return new byte[0];
+ default:
+ // Unsupported body shapes are coerced to their string form so we never drop the response.
+ return body.toString().getBytes(StandardCharsets.UTF_8);
+ }
+ }
+
+ private static String headerLookup(Map headers, String key) {
+ if (headers == null) {
+ return null;
+ }
+ for (Map.Entry entry : headers.entrySet()) {
+ if (key.equalsIgnoreCase(entry.getKey())) {
+ return entry.getValue();
+ }
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java
index fbe47de..1fa93cc 100644
--- a/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java
+++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinator.java
@@ -1,7 +1,6 @@
package com.microsoft.azure.functions.worker.http;
import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -34,10 +33,12 @@ public final class HttpInvocationCoordinator {
/**
* Registers the arrival of an HTTP request for the given invocation.
- * If the matching gRPC dispatch has not yet arrived, the returned future
- * resolves once it does. Called by the HTTP proxy handler.
+ * Returns the slot so the HTTP handler can await
+ * {@link HttpInvocationSlot#completion()}.
+ *
+ * @throws IllegalStateException if HTTP arrival was already registered for this id
*/
- public CompletableFuture registerHttpArrival(String invocationId, HttpExchange exchange) {
+ public HttpInvocationSlot registerHttpArrival(String invocationId, HttpExchange exchange) {
Objects.requireNonNull(invocationId, "invocationId");
Objects.requireNonNull(exchange, "exchange");
HttpInvocationSlot slot = slots.computeIfAbsent(invocationId, HttpInvocationSlot::new);
@@ -45,15 +46,17 @@ public CompletableFuture registerHttpArrival(String invocatio
throw new IllegalStateException(
"HTTP arrival already registered for invocation " + invocationId);
}
- return slot.grpcArrival();
+ return slot;
}
/**
* Registers the arrival of a gRPC InvocationRequest for the given invocation.
- * If the matching HTTP request has not yet arrived, the returned future
- * resolves once it does. Called by the gRPC invocation dispatcher.
+ * Returns the slot so the gRPC dispatcher can await
+ * {@link HttpInvocationSlot#httpArrival()}.
+ *
+ * @throws IllegalStateException if gRPC arrival was already registered for this id
*/
- public CompletableFuture registerGrpcArrival(InvocationRequest request) {
+ public HttpInvocationSlot registerGrpcArrival(InvocationRequest request) {
Objects.requireNonNull(request, "request");
String invocationId = request.getInvocationId();
HttpInvocationSlot slot = slots.computeIfAbsent(invocationId, HttpInvocationSlot::new);
@@ -61,7 +64,7 @@ public CompletableFuture registerGrpcArrival(InvocationRequest req
throw new IllegalStateException(
"gRPC arrival already registered for invocation " + invocationId);
}
- return slot.httpArrival();
+ return slot;
}
/**
diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java
index 7a5e309..6f4c77c 100644
--- a/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java
+++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpInvocationSlot.java
@@ -17,14 +17,15 @@
*
*
*
Either side may arrive first. The slot exposes futures that the HTTP
- * handler thread and the gRPC dispatcher thread wait on. The {@code completion}
- * future is signaled once the invocation has fully responded, allowing the slot
- * to be released from the coordinator's map.
+ * handler thread and the gRPC dispatcher thread wait on. The {@link #completion}
+ * future is signaled once the invocation has fully responded, allowing the HTTP
+ * handler to return from {@code handle()} so the server can close the exchange.
*
- *
Instances are package-private; use {@link HttpInvocationCoordinator} to
- * acquire and release slots.
+ *
The class is mutable from the coordinator's perspective only; consumers
+ * see immutable {@link CompletableFuture} handles and use them to await
+ * rendezvous and completion.
*/
-final class HttpInvocationSlot {
+public final class HttpInvocationSlot {
private final String invocationId;
private final CompletableFuture httpArrival = new CompletableFuture<>();
private final CompletableFuture grpcArrival = new CompletableFuture<>();
@@ -34,19 +35,32 @@ final class HttpInvocationSlot {
this.invocationId = invocationId;
}
- String getInvocationId() {
+ public String getInvocationId() {
return invocationId;
}
- CompletableFuture httpArrival() {
+ /**
+ * Future that resolves when the HTTP request for this invocation arrives.
+ * Consumed by the gRPC dispatcher thread.
+ */
+ public CompletableFuture httpArrival() {
return httpArrival;
}
- CompletableFuture grpcArrival() {
+ /**
+ * Future that resolves when the gRPC {@code InvocationRequest} for this
+ * invocation arrives. Consumed by the HTTP handler thread.
+ */
+ public CompletableFuture grpcArrival() {
return grpcArrival;
}
- CompletableFuture completion() {
+ /**
+ * Future that resolves when the invocation has fully completed (response
+ * written to HTTP, output bindings collected for the gRPC response).
+ * The HTTP handler thread waits on this before returning from {@code handle()}.
+ */
+ public CompletableFuture completion() {
return completion;
}
}
diff --git a/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyHandler.java b/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyHandler.java
new file mode 100644
index 0000000..2b74841
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/functions/worker/http/HttpProxyHandler.java
@@ -0,0 +1,101 @@
+package com.microsoft.azure.functions.worker.http;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.microsoft.azure.functions.worker.WorkerLogManager;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+/**
+ * Handler attached to the worker's embedded HTTP proxy server.
+ *
+ *
Receives HTTP requests forwarded by the Functions host (via the HttpUri
+ * capability) and parks them on the {@link HttpInvocationCoordinator} until
+ * the gRPC dispatcher picks them up. The actual invocation runs on the gRPC
+ * dispatch thread, which reads the request body and writes the response back
+ * to the same {@link HttpExchange}. This handler simply:
+ *
+ *
Extracts {@code x-ms-invocation-id} from the request headers.
+ *
Registers the HTTP arrival with the coordinator.
+ *
Blocks on the slot's {@code completion} future so the exchange stays
+ * open until the gRPC side finishes writing the response.
+ *
Returns from {@code handle()}, letting the JDK HttpServer close the
+ * exchange.
+ *
+ *
+ *
Missing header or unexpected failures are converted into appropriate HTTP
+ * error responses so the host always gets a closed connection.
+ */
+public final class HttpProxyHandler implements HttpHandler {
+ /** Header set by {@code DefaultHttpProxyService} on the host side. */
+ public static final String INVOCATION_ID_HEADER = "x-ms-invocation-id";
+
+ private static final Logger LOGGER = WorkerLogManager.getSystemLogger();
+
+ private final HttpInvocationCoordinator coordinator;
+
+ public HttpProxyHandler(HttpInvocationCoordinator coordinator) {
+ this.coordinator = Objects.requireNonNull(coordinator, "coordinator");
+ }
+
+ @Override
+ public void handle(HttpExchange exchange) throws IOException {
+ String invocationId = exchange.getRequestHeaders().getFirst(INVOCATION_ID_HEADER);
+ if (invocationId == null || invocationId.isEmpty()) {
+ LOGGER.warning("HTTP proxy request missing " + INVOCATION_ID_HEADER + " header");
+ try {
+ HttpBodyBridge.writeErrorResponse(exchange, 400,
+ "Missing required header: " + INVOCATION_ID_HEADER);
+ } finally {
+ exchange.close();
+ }
+ return;
+ }
+
+ HttpInvocationSlot slot;
+ try {
+ slot = coordinator.registerHttpArrival(invocationId, exchange);
+ } catch (IllegalStateException ex) {
+ LOGGER.log(Level.WARNING, "Duplicate HTTP arrival for invocation " + invocationId, ex);
+ try {
+ HttpBodyBridge.writeErrorResponse(exchange, 409,
+ "Duplicate HTTP arrival for invocation " + invocationId);
+ } finally {
+ exchange.close();
+ }
+ return;
+ }
+
+ try {
+ // Block until the gRPC dispatcher signals invocation completion.
+ // The dispatcher is responsible for writing the response to this
+ // exchange; we simply hold the connection open in the meantime.
+ slot.completion().get();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ tryWriteError(exchange, 503, "Worker interrupted while waiting for invocation");
+ } catch (CancellationException ex) {
+ // Coordinator cancelled the futures via releaseInvocation();
+ // the gRPC side has already written (or chosen not to write) the response.
+ } catch (ExecutionException ex) {
+ Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
+ LOGGER.log(Level.WARNING, "Invocation " + invocationId + " failed before responding", cause);
+ tryWriteError(exchange, 500, "Invocation failed: " + cause.getMessage());
+ } finally {
+ exchange.close();
+ }
+ }
+
+ private static void tryWriteError(HttpExchange exchange, int status, String message) {
+ try {
+ HttpBodyBridge.writeErrorResponse(exchange, status, message);
+ } catch (IOException ioe) {
+ LOGGER.log(Level.FINE, "Unable to write error response (response likely already started)", ioe);
+ }
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpBodyBridgeTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpBodyBridgeTest.java
new file mode 100644
index 0000000..e48f0e4
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpBodyBridgeTest.java
@@ -0,0 +1,230 @@
+package com.microsoft.azure.functions.worker.http;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import com.google.protobuf.ByteString;
+import com.microsoft.azure.functions.rpc.messages.InvocationRequest;
+import com.microsoft.azure.functions.rpc.messages.ParameterBinding;
+import com.microsoft.azure.functions.rpc.messages.RpcHttp;
+import com.microsoft.azure.functions.rpc.messages.TypedData;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+
+import org.junit.jupiter.api.Test;
+
+public class HttpBodyBridgeTest {
+
+ @Test
+ public void buildBodyTypedDataJsonContentType() {
+ TypedData data = HttpBodyBridge.buildBodyTypedData(
+ "{\"k\":1}".getBytes(StandardCharsets.UTF_8), "application/json");
+ assertEquals(TypedData.DataCase.JSON, data.getDataCase());
+ assertEquals("{\"k\":1}", data.getJson());
+ }
+
+ @Test
+ public void buildBodyTypedDataJsonWithCharsetSuffix() {
+ TypedData data = HttpBodyBridge.buildBodyTypedData(
+ "{}".getBytes(StandardCharsets.UTF_8), "application/json; charset=utf-8");
+ assertEquals(TypedData.DataCase.JSON, data.getDataCase());
+ }
+
+ @Test
+ public void buildBodyTypedDataTextContentType() {
+ TypedData data = HttpBodyBridge.buildBodyTypedData(
+ "hello".getBytes(StandardCharsets.UTF_8), "text/plain");
+ assertEquals(TypedData.DataCase.STRING, data.getDataCase());
+ assertEquals("hello", data.getString());
+ }
+
+ @Test
+ public void buildBodyTypedDataFormEncoded() {
+ TypedData data = HttpBodyBridge.buildBodyTypedData(
+ "a=1&b=2".getBytes(StandardCharsets.UTF_8), "application/x-www-form-urlencoded");
+ assertEquals(TypedData.DataCase.STRING, data.getDataCase());
+ }
+
+ @Test
+ public void buildBodyTypedDataBinaryWhenNoContentType() {
+ byte[] bytes = new byte[]{1, 2, 3};
+ TypedData data = HttpBodyBridge.buildBodyTypedData(bytes, null);
+ assertEquals(TypedData.DataCase.BYTES, data.getDataCase());
+ assertArrayEquals(bytes, data.getBytes().toByteArray());
+ }
+
+ @Test
+ public void buildBodyTypedDataBinaryWhenOctetStream() {
+ byte[] bytes = "binary".getBytes(StandardCharsets.UTF_8);
+ TypedData data = HttpBodyBridge.buildBodyTypedData(bytes, "application/octet-stream");
+ assertEquals(TypedData.DataCase.BYTES, data.getDataCase());
+ }
+
+ @Test
+ public void buildBodyTypedDataRespectsCharset() {
+ byte[] bytes = "héllo".getBytes(StandardCharsets.ISO_8859_1);
+ TypedData data = HttpBodyBridge.buildBodyTypedData(bytes, "text/plain; charset=ISO-8859-1");
+ assertEquals(TypedData.DataCase.STRING, data.getDataCase());
+ assertEquals("héllo", data.getString());
+ }
+
+ @Test
+ public void enrichRequestWithBodyReplacesHttpBody() throws Exception {
+ InvocationRequest original = InvocationRequest.newBuilder()
+ .setInvocationId("inv-1")
+ .addInputData(ParameterBinding.newBuilder()
+ .setName("req")
+ .setData(TypedData.newBuilder()
+ .setHttp(RpcHttp.newBuilder().setMethod("POST").setUrl("http://localhost/api/x"))))
+ .build();
+ HttpExchange exchange = mockExchangeWithBody("payload".getBytes(StandardCharsets.UTF_8), "text/plain");
+
+ InvocationRequest enriched = HttpBodyBridge.enrichRequestWithBody(original, exchange);
+ assertNotSame(original, enriched);
+ TypedData body = enriched.getInputData(0).getData().getHttp().getBody();
+ assertEquals(TypedData.DataCase.STRING, body.getDataCase());
+ assertEquals("payload", body.getString());
+ // Method/url preserved.
+ assertEquals("POST", enriched.getInputData(0).getData().getHttp().getMethod());
+ }
+
+ @Test
+ public void enrichRequestWithBodyReturnsSameWhenNoHttpInput() throws Exception {
+ InvocationRequest original = InvocationRequest.newBuilder()
+ .setInvocationId("inv-1")
+ .addInputData(ParameterBinding.newBuilder()
+ .setName("queueItem")
+ .setData(TypedData.newBuilder().setString("hello")))
+ .build();
+ HttpExchange exchange = mock(HttpExchange.class);
+ InvocationRequest result = HttpBodyBridge.enrichRequestWithBody(original, exchange);
+ assertSame(original, result, "Non-HTTP requests should not be modified");
+ }
+
+ @Test
+ public void enrichRequestReadsChunkedBodyLargerThanBuffer() throws Exception {
+ // Simulate transfer-encoding: chunked by providing a body larger than the read buffer.
+ byte[] big = new byte[20_000];
+ for (int i = 0; i < big.length; i++) {
+ big[i] = (byte) (i & 0xff);
+ }
+ InvocationRequest original = InvocationRequest.newBuilder()
+ .setInvocationId("inv-big")
+ .addInputData(ParameterBinding.newBuilder().setName("req")
+ .setData(TypedData.newBuilder().setHttp(RpcHttp.newBuilder().setMethod("POST"))))
+ .build();
+ HttpExchange exchange = mockExchangeWithBody(big, "application/octet-stream");
+
+ InvocationRequest enriched = HttpBodyBridge.enrichRequestWithBody(original, exchange);
+ TypedData body = enriched.getInputData(0).getData().getHttp().getBody();
+ assertEquals(TypedData.DataCase.BYTES, body.getDataCase());
+ assertArrayEquals(big, body.getBytes().toByteArray());
+ }
+
+ @Test
+ public void writeRpcHttpResponseWritesStatusHeadersAndBody() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ Headers responseHeaders = new Headers();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(responseHeaders);
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ RpcHttp response = RpcHttp.newBuilder()
+ .setStatusCode("201")
+ .putHeaders("Content-Type", "application/json")
+ .putHeaders("X-Custom", "value")
+ .setBody(TypedData.newBuilder().setJson("{\"ok\":true}"))
+ .build();
+
+ HttpBodyBridge.writeRpcHttpResponse(exchange, response);
+
+ byte[] expected = "{\"ok\":true}".getBytes(StandardCharsets.UTF_8);
+ verify(exchange).sendResponseHeaders(201, expected.length);
+ assertArrayEquals(expected, captured.toByteArray());
+ assertEquals("application/json", responseHeaders.getFirst("Content-Type"));
+ assertEquals("value", responseHeaders.getFirst("X-Custom"));
+ }
+
+ @Test
+ public void writeRpcHttpResponseHandlesEmptyBody() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ RpcHttp response = RpcHttp.newBuilder().setStatusCode("204").build();
+ HttpBodyBridge.writeRpcHttpResponse(exchange, response);
+
+ verify(exchange).sendResponseHeaders(204, -1);
+ assertEquals(0, captured.size());
+ }
+
+ @Test
+ public void writeRpcHttpResponseHandlesBytesBody() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ byte[] payload = new byte[]{0x01, 0x02, 0x03};
+ RpcHttp response = RpcHttp.newBuilder()
+ .setStatusCode("200")
+ .setBody(TypedData.newBuilder().setBytes(ByteString.copyFrom(payload)))
+ .build();
+ HttpBodyBridge.writeRpcHttpResponse(exchange, response);
+
+ verify(exchange).sendResponseHeaders(200, payload.length);
+ assertArrayEquals(payload, captured.toByteArray());
+ }
+
+ @Test
+ public void writeRpcHttpResponseDefaultsInvalidStatusTo500() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ RpcHttp response = RpcHttp.newBuilder().setStatusCode("not-a-number").build();
+ HttpBodyBridge.writeRpcHttpResponse(exchange, response);
+
+ verify(exchange).sendResponseHeaders(500, -1);
+ }
+
+ @Test
+ public void writeErrorResponseWritesPlainText() throws Exception {
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ Headers headers = new Headers();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getResponseHeaders()).thenReturn(headers);
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ HttpBodyBridge.writeErrorResponse(exchange, 418, "I'm a teapot");
+
+ byte[] expected = "I'm a teapot".getBytes(StandardCharsets.UTF_8);
+ verify(exchange).sendResponseHeaders(418, expected.length);
+ assertArrayEquals(expected, captured.toByteArray());
+ assertTrue(headers.getFirst("Content-Type").startsWith("text/plain"));
+ }
+
+ private static HttpExchange mockExchangeWithBody(byte[] body, String contentType) throws IOException {
+ HttpExchange exchange = mock(HttpExchange.class);
+ Headers headers = new Headers();
+ if (contentType != null) {
+ headers.add("Content-Type", contentType);
+ }
+ when(exchange.getRequestHeaders()).thenReturn(headers);
+ when(exchange.getRequestBody()).thenReturn(new ByteArrayInputStream(body));
+ return exchange;
+ }
+}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java
index 06a6318..4fd35d3 100644
--- a/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java
+++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpInvocationCoordinatorTest.java
@@ -8,7 +8,6 @@
import static org.mockito.Mockito.mock;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -28,13 +27,14 @@ public void httpArrivesBeforeGrpc() throws Exception {
HttpExchange exchange = mock(HttpExchange.class);
InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
- CompletableFuture grpcFuture = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
- assertFalse(grpcFuture.isDone(), "gRPC future should still be pending before gRPC arrival");
+ HttpInvocationSlot httpSlot = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertFalse(httpSlot.grpcArrival().isDone(), "gRPC future should still be pending before gRPC arrival");
- CompletableFuture httpFuture = coordinator.registerGrpcArrival(request);
- assertTrue(httpFuture.isDone(), "HTTP future should already be resolved once gRPC arrives");
- assertSame(exchange, httpFuture.get(1, TimeUnit.SECONDS));
- assertSame(request, grpcFuture.get(1, TimeUnit.SECONDS));
+ HttpInvocationSlot grpcSlot = coordinator.registerGrpcArrival(request);
+ assertSame(httpSlot, grpcSlot, "both registrations should yield the same slot");
+ assertTrue(grpcSlot.httpArrival().isDone(), "HTTP future should already be resolved once gRPC arrives");
+ assertSame(exchange, grpcSlot.httpArrival().get(1, TimeUnit.SECONDS));
+ assertSame(request, httpSlot.grpcArrival().get(1, TimeUnit.SECONDS));
}
@Test
@@ -43,13 +43,14 @@ public void grpcArrivesBeforeHttp() throws Exception {
HttpExchange exchange = mock(HttpExchange.class);
InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
- CompletableFuture httpFuture = coordinator.registerGrpcArrival(request);
- assertFalse(httpFuture.isDone(), "HTTP future should still be pending before HTTP arrival");
+ HttpInvocationSlot grpcSlot = coordinator.registerGrpcArrival(request);
+ assertFalse(grpcSlot.httpArrival().isDone(), "HTTP future should still be pending before HTTP arrival");
- CompletableFuture grpcFuture = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
- assertTrue(grpcFuture.isDone(), "gRPC future should already be resolved once HTTP arrives");
- assertSame(exchange, httpFuture.get(1, TimeUnit.SECONDS));
- assertSame(request, grpcFuture.get(1, TimeUnit.SECONDS));
+ HttpInvocationSlot httpSlot = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertSame(grpcSlot, httpSlot, "both registrations should yield the same slot");
+ assertTrue(httpSlot.grpcArrival().isDone(), "gRPC future should already be resolved once HTTP arrives");
+ assertSame(exchange, httpSlot.httpArrival().get(1, TimeUnit.SECONDS));
+ assertSame(request, httpSlot.grpcArrival().get(1, TimeUnit.SECONDS));
}
@Test
@@ -75,14 +76,17 @@ public void releaseInvocationIsIdempotent() {
public void failInvocationPropagatesToFutures() {
HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
HttpExchange exchange = mock(HttpExchange.class);
- CompletableFuture grpcFuture = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ HttpInvocationSlot slot = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
IOException cause = new IOException("boom");
coordinator.failInvocation(INVOCATION_ID, cause);
ExecutionException ex = assertThrows(ExecutionException.class,
- () -> grpcFuture.get(1, TimeUnit.SECONDS));
+ () -> slot.grpcArrival().get(1, TimeUnit.SECONDS));
assertSame(cause, ex.getCause());
+ ExecutionException completionEx = assertThrows(ExecutionException.class,
+ () -> slot.completion().get(1, TimeUnit.SECONDS));
+ assertSame(cause, completionEx.getCause());
assertEquals(0, coordinator.activeInvocationCount());
}
@@ -112,24 +116,34 @@ public void independentInvocationsDoNotInterfere() throws Exception {
InvocationRequest reqA = InvocationRequest.newBuilder().setInvocationId("a").build();
InvocationRequest reqB = InvocationRequest.newBuilder().setInvocationId("b").build();
- CompletableFuture grpcA = coordinator.registerHttpArrival("a", exchangeA);
- CompletableFuture grpcB = coordinator.registerHttpArrival("b", exchangeB);
+ HttpInvocationSlot slotA = coordinator.registerHttpArrival("a", exchangeA);
+ HttpInvocationSlot slotB = coordinator.registerHttpArrival("b", exchangeB);
// Resolve only A; B must still be pending.
coordinator.registerGrpcArrival(reqA);
- assertTrue(grpcA.isDone());
- assertFalse(grpcB.isDone());
+ assertTrue(slotA.grpcArrival().isDone());
+ assertFalse(slotB.grpcArrival().isDone());
coordinator.registerGrpcArrival(reqB);
- assertSame(reqA, grpcA.get(1, TimeUnit.SECONDS));
- assertSame(reqB, grpcB.get(1, TimeUnit.SECONDS));
+ assertSame(reqA, slotA.grpcArrival().get(1, TimeUnit.SECONDS));
+ assertSame(reqB, slotB.grpcArrival().get(1, TimeUnit.SECONDS));
}
@Test
public void grpcFutureRemainsPendingUntilHttpArrives() {
HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
InvocationRequest request = InvocationRequest.newBuilder().setInvocationId(INVOCATION_ID).build();
- CompletableFuture httpFuture = coordinator.registerGrpcArrival(request);
+ HttpInvocationSlot slot = coordinator.registerGrpcArrival(request);
// No HTTP arrival; future must time out.
- assertThrows(TimeoutException.class, () -> httpFuture.get(50, TimeUnit.MILLISECONDS));
+ assertThrows(TimeoutException.class, () -> slot.httpArrival().get(50, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void completionFutureResolvesOnRelease() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpExchange exchange = mock(HttpExchange.class);
+ HttpInvocationSlot slot = coordinator.registerHttpArrival(INVOCATION_ID, exchange);
+ assertFalse(slot.completion().isDone());
+ coordinator.releaseInvocation(INVOCATION_ID);
+ slot.completion().get(1, TimeUnit.SECONDS); // resolves without throwing
}
}
diff --git a/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyHandlerTest.java b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyHandlerTest.java
new file mode 100644
index 0000000..42e9dd6
--- /dev/null
+++ b/src/test/java/com/microsoft/azure/functions/worker/http/HttpProxyHandlerTest.java
@@ -0,0 +1,129 @@
+package com.microsoft.azure.functions.worker.http;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+
+import org.junit.jupiter.api.Test;
+
+public class HttpProxyHandlerTest {
+
+ private static final String INVOCATION_ID = "abc-123";
+
+ @Test
+ public void rejectsRequestWithoutInvocationIdHeader() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpProxyHandler handler = new HttpProxyHandler(coordinator);
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ when(exchange.getRequestHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ handler.handle(exchange);
+
+ verify(exchange).sendResponseHeaders(400, captured.size());
+ verify(exchange).close();
+ assertEquals(0, coordinator.activeInvocationCount());
+ }
+
+ @Test
+ public void registersHttpArrivalAndWaitsForCompletion() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpProxyHandler handler = new HttpProxyHandler(coordinator);
+ HttpExchange exchange = mock(HttpExchange.class);
+ Headers requestHeaders = new Headers();
+ requestHeaders.add(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID);
+ when(exchange.getRequestHeaders()).thenReturn(requestHeaders);
+
+ AtomicReference handlerError = new AtomicReference<>();
+ CompletableFuture handlerDone = CompletableFuture.runAsync(() -> {
+ try {
+ handler.handle(exchange);
+ } catch (Throwable t) {
+ handlerError.set(t);
+ }
+ });
+
+ // Wait for the handler to register HTTP arrival.
+ long deadline = System.currentTimeMillis() + 1000;
+ while (coordinator.activeInvocationCount() == 0 && System.currentTimeMillis() < deadline) {
+ Thread.sleep(10);
+ }
+ assertEquals(1, coordinator.activeInvocationCount());
+
+ // Simulate the gRPC side finishing the invocation.
+ coordinator.releaseInvocation(INVOCATION_ID);
+
+ handlerDone.get();
+ assertEquals(null, handlerError.get());
+ verify(exchange).close();
+ // The handler must NOT have written any error response - the gRPC side owns the body.
+ verify(exchange, never()).sendResponseHeaders(anyInt(), anyLong());
+ }
+
+ @Test
+ public void respondsWith500WhenInvocationFails() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpProxyHandler handler = new HttpProxyHandler(coordinator);
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange exchange = mock(HttpExchange.class);
+ Headers requestHeaders = new Headers();
+ requestHeaders.add(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID);
+ when(exchange.getRequestHeaders()).thenReturn(requestHeaders);
+ when(exchange.getResponseHeaders()).thenReturn(new Headers());
+ when(exchange.getResponseBody()).thenReturn(captured);
+
+ CompletableFuture handlerDone = CompletableFuture.runAsync(() -> {
+ try {
+ handler.handle(exchange);
+ } catch (Exception ignored) {
+ }
+ });
+
+ long deadline = System.currentTimeMillis() + 1000;
+ while (coordinator.activeInvocationCount() == 0 && System.currentTimeMillis() < deadline) {
+ Thread.sleep(10);
+ }
+ coordinator.failInvocation(INVOCATION_ID, new RuntimeException("user fn crashed"));
+
+ handlerDone.get();
+ verify(exchange).sendResponseHeaders(500, captured.size());
+ verify(exchange).close();
+ assertTrue(new String(captured.toByteArray()).contains("user fn crashed"));
+ }
+
+ @Test
+ public void duplicateRegistrationReturns409() throws Exception {
+ HttpInvocationCoordinator coordinator = new HttpInvocationCoordinator();
+ HttpProxyHandler handler = new HttpProxyHandler(coordinator);
+ // Pre-register HTTP arrival to force a duplicate on the next handle() call.
+ HttpExchange first = mock(HttpExchange.class);
+ coordinator.registerHttpArrival(INVOCATION_ID, first);
+
+ ByteArrayOutputStream captured = new ByteArrayOutputStream();
+ HttpExchange second = mock(HttpExchange.class);
+ Headers headers = new Headers();
+ headers.add(HttpProxyHandler.INVOCATION_ID_HEADER, INVOCATION_ID);
+ when(second.getRequestHeaders()).thenReturn(headers);
+ when(second.getResponseHeaders()).thenReturn(new Headers());
+ when(second.getResponseBody()).thenReturn(captured);
+
+ handler.handle(second);
+
+ verify(second).sendResponseHeaders(409, captured.size());
+ verify(second).close();
+ }
+}
From 0ada8b56ac41eb69725a67eb253ebdbd5d097ffc Mon Sep 17 00:00:00 2001
From: Ahmed Muhsin
Date: Wed, 3 Jun 2026 14:40:14 -0500
Subject: [PATCH 03/10] Route HTTP-triggered invocations through embedded proxy
server
Wires HttpProxyServer and HttpInvocationCoordinator into the worker
lifecycle. When the FUNCTIONS_JAVA_DISABLE_HTTP_PROXY environment
variable is not set to true:
- JavaWorkerClient constructs a single HttpProxyServer and a single
HttpInvocationCoordinator, shared across all handler instances.
- WorkerInitRequestHandler starts the proxy server during worker init,
advertises HttpUri = "http://127.0.0.1:" and
RequiresRouteParameters = "true" so the host forwards HTTP traffic
for HTTP-triggered functions directly to the worker.
- InvocationRequestHandler detects requests that contain an HTTP input
binding, rendezvous-es with the matching HttpExchange via the
coordinator, folds the request body into the gRPC InvocationRequest,
invokes the user function, writes the RpcHttp response back to the
exchange, and releases the slot. Failures fail the slot so the HTTP
handler returns 500.
- Non-HTTP invocations (queue, timer, etc.) bypass the coordinator and
execute on the existing gRPC-only path.
- JavaWorkerClient.close() stops the proxy server before tearing down
the gRPC peer so in-flight HTTP handlers drain cleanly.
Fixes #781: requests sent with Transfer-Encoding: chunked previously
arrived at the worker with an empty body because of a null-comparison
bug in the host's RpcHttp content-length handling. Moving body delivery
to HTTP (matching Go / Python / .NET isolated workers) sidesteps that
host bug entirely.
The escape hatch FUNCTIONS_JAVA_DISABLE_HTTP_PROXY=true restores the
old behavior if anything regresses.
Test verification: full test suite (mvn -pl . test) passes 111/111
tests including 35 new tests across HttpProxyServer, coordinator,
handler, and body bridge.
---
.../azure/functions/worker/Constants.java | 7 ++
.../functions/worker/JavaWorkerClient.java | 31 ++++++-
.../handler/InvocationRequestHandler.java | 85 ++++++++++++++++++-
.../handler/WorkerInitRequestHandler.java | 33 +++++++
4 files changed, 150 insertions(+), 6 deletions(-)
diff --git a/src/main/java/com/microsoft/azure/functions/worker/Constants.java b/src/main/java/com/microsoft/azure/functions/worker/Constants.java
index 87b2834..2369b35 100644
--- a/src/main/java/com/microsoft/azure/functions/worker/Constants.java
+++ b/src/main/java/com/microsoft/azure/functions/worker/Constants.java
@@ -18,4 +18,11 @@ private Constants(){}
public static final String JAVA_ENABLE_OPENTELEMETRY = "JAVA_ENABLE_OPENTELEMETRY";
public static final String JAVA_APPLICATIONINSIGHTS_ENABLE_TELEMETRY = "JAVA_APPLICATIONINSIGHTS_ENABLE_TELEMETRY";
public static final String JAVA_ENABLE_SDK_TYPES = "JAVA_ENABLE_SDK_TYPES";
+ /**
+ * If set to "true" (case-insensitive), the worker will NOT start the
+ * embedded HTTP proxy server and will NOT advertise the {@code HttpUri}
+ * capability. Useful as an escape hatch if the proxy path causes problems.
+ * Default: unset (proxy enabled).
+ */
+ public static final String FUNCTIONS_JAVA_DISABLE_HTTP_PROXY = "FUNCTIONS_JAVA_DISABLE_HTTP_PROXY";
}
diff --git a/src/main/java/com/microsoft/azure/functions/worker/JavaWorkerClient.java b/src/main/java/com/microsoft/azure/functions/worker/JavaWorkerClient.java
index 478102b..fca6bf7 100644
--- a/src/main/java/com/microsoft/azure/functions/worker/JavaWorkerClient.java
+++ b/src/main/java/com/microsoft/azure/functions/worker/JavaWorkerClient.java
@@ -12,6 +12,9 @@
import com.microsoft.azure.functions.worker.broker.*;
import com.microsoft.azure.functions.worker.handler.*;
+import com.microsoft.azure.functions.worker.http.HttpInvocationCoordinator;
+import com.microsoft.azure.functions.worker.http.HttpProxyServer;
+import com.microsoft.azure.functions.worker.http.ProxyConfig;
import com.microsoft.azure.functions.worker.reflect.*;
import com.microsoft.azure.functions.rpc.messages.*;
@@ -36,19 +39,28 @@ public JavaWorkerClient(IApplication app) {
this.peer = new AtomicReference<>(null);
this.handlerSuppliers = new HashMap<>();
this.classPathProvider = new FactoryClassLoader().createClassLoaderProvider();
-
+ this.httpInvocationCoordinator = new HttpInvocationCoordinator();
+ this.httpProxyServer = httpProxyEnabled() ? new HttpProxyServer(ProxyConfig.defaults()) : null;
+
this.addHandlers();
}
+ private static boolean httpProxyEnabled() {
+ String value = System.getenv(Constants.FUNCTIONS_JAVA_DISABLE_HTTP_PROXY);
+ return !Boolean.parseBoolean(value);
+ }
+
@PostConstruct
private void addHandlers() {
JavaFunctionBroker broker = new JavaFunctionBroker(classPathProvider);
-
- this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_INIT_REQUEST, () -> new WorkerInitRequestHandler(broker));
+
+ this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_INIT_REQUEST,
+ () -> new WorkerInitRequestHandler(broker, this.httpProxyServer, this.httpInvocationCoordinator));
this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_WARMUP_REQUEST, WorkerWarmupHandler::new);
this.handlerSuppliers.put(StreamingMessage.ContentCase.FUNCTION_ENVIRONMENT_RELOAD_REQUEST, () -> new FunctionEnvironmentReloadRequestHandler(broker));
this.handlerSuppliers.put(StreamingMessage.ContentCase.FUNCTION_LOAD_REQUEST, () -> new FunctionLoadRequestHandler(broker));
- this.handlerSuppliers.put(StreamingMessage.ContentCase.INVOCATION_REQUEST, () -> new InvocationRequestHandler(broker));
+ this.handlerSuppliers.put(StreamingMessage.ContentCase.INVOCATION_REQUEST,
+ () -> new InvocationRequestHandler(broker, this.httpInvocationCoordinator));
this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_STATUS_REQUEST, WorkerStatusRequestHandler::new);
this.handlerSuppliers.put(StreamingMessage.ContentCase.WORKER_TERMINATE, WorkerTerminateRequestHandler::new);
}
@@ -68,6 +80,15 @@ void logToHost(LogRecord record, String invocationId) {
@Override
public void close() throws Exception {
+ // Stop accepting HTTP proxy requests before tearing down the gRPC peer
+ // so in-flight HTTP handlers can drain on completion futures cleanly.
+ if (this.httpProxyServer != null) {
+ try {
+ this.httpProxyServer.close();
+ } catch (Exception ex) {
+ logger.log(Level.WARNING, "Failed to close HTTP proxy server cleanly", ex);
+ }
+ }
this.peer.get().close();
this.peer.set(null);
this.channel.shutdownNow();
@@ -143,6 +164,8 @@ private synchronized void send(String requestId, MessageHandler, ?> marshaller
private final AtomicReference peer;
private final Map>> handlerSuppliers;
private final ClassLoaderProvider classPathProvider;
+ private final HttpInvocationCoordinator httpInvocationCoordinator;
+ private final HttpProxyServer httpProxyServer;
/**
* @param functionsUri Host endpoint URI, or null for legacy startup args that only provide host and port.
diff --git a/src/main/java/com/microsoft/azure/functions/worker/handler/InvocationRequestHandler.java b/src/main/java/com/microsoft/azure/functions/worker/handler/InvocationRequestHandler.java
index 434a573..bf4a91b 100644
--- a/src/main/java/com/microsoft/azure/functions/worker/handler/InvocationRequestHandler.java
+++ b/src/main/java/com/microsoft/azure/functions/worker/handler/InvocationRequestHandler.java
@@ -1,20 +1,30 @@
package com.microsoft.azure.functions.worker.handler;
import java.util.*;
+import java.util.concurrent.ExecutionException;
import java.util.logging.*;
import com.microsoft.azure.functions.worker.*;
import com.microsoft.azure.functions.worker.broker.*;
+import com.microsoft.azure.functions.worker.http.HttpBodyBridge;
+import com.microsoft.azure.functions.worker.http.HttpInvocationCoordinator;
+import com.microsoft.azure.functions.worker.http.HttpInvocationSlot;
import com.microsoft.azure.functions.rpc.messages.*;
+import com.sun.net.httpserver.HttpExchange;
public class InvocationRequestHandler extends MessageHandler {
public InvocationRequestHandler(JavaFunctionBroker broker) {
+ this(broker, null);
+ }
+
+ public InvocationRequestHandler(JavaFunctionBroker broker, HttpInvocationCoordinator httpInvocationCoordinator) {
super(StreamingMessage::getInvocationRequest,
InvocationResponse::newBuilder,
InvocationResponse.Builder::setResult,
StreamingMessage.Builder::setInvocationResponse);
assert broker != null;
this.broker = broker;
+ this.httpInvocationCoordinator = httpInvocationCoordinator;
this.invocationLogger = super.getLogger();
}
@@ -29,7 +39,15 @@ String execute(InvocationRequest request, InvocationResponse.Builder response) t
this.invocationLogger = WorkerLogManager.getInvocationLogger(invocationId);
response.setInvocationId(invocationId);
-
+
+ // For HTTP-triggered invocations dispatched via the HttpUri capability, the
+ // gRPC request carries trigger metadata but an empty body. We rendezvous
+ // with the HTTP arrival via the coordinator, fold the body bytes back into
+ // the request, and write the response to the held HttpExchange.
+ if (httpInvocationCoordinator != null && hasHttpInput(request)) {
+ return executeProxiedHttp(request, response, functionId, invocationId);
+ }
+
List outputBindings = new ArrayList<>();
this.broker.invokeMethod(functionId, request, outputBindings).ifPresent(response::setReturnValue);
response.addAllOutputData(outputBindings);
@@ -38,6 +56,69 @@ String execute(InvocationRequest request, InvocationResponse.Builder response) t
this.broker.getMethodName(functionId).orElse("UNKNOWN"), invocationId);
}
- private JavaFunctionBroker broker;
+ private String executeProxiedHttp(InvocationRequest request,
+ InvocationResponse.Builder response,
+ String functionId,
+ String invocationId) throws Exception {
+ HttpInvocationSlot slot = httpInvocationCoordinator.registerGrpcArrival(request);
+ HttpExchange exchange = null;
+ try {
+ try {
+ exchange = slot.httpArrival().get();
+ } catch (ExecutionException ex) {
+ Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
+ throw asException(cause);
+ }
+ InvocationRequest enriched = HttpBodyBridge.enrichRequestWithBody(request, exchange);
+ List outputBindings = new ArrayList<>();
+ this.broker.invokeMethod(functionId, enriched, outputBindings).ifPresent(response::setReturnValue);
+ response.addAllOutputData(outputBindings);
+ RpcHttp httpResponse = extractHttpResponse(response, outputBindings);
+ HttpBodyBridge.writeRpcHttpResponse(exchange, httpResponse);
+ httpInvocationCoordinator.releaseInvocation(invocationId);
+ return String.format("Function \"%s\" (Id: %s) invoked by Java Worker (HTTP proxy)",
+ this.broker.getMethodName(functionId).orElse("UNKNOWN"), invocationId);
+ } catch (Throwable t) {
+ httpInvocationCoordinator.failInvocation(invocationId, t);
+ throw asException(t);
+ }
+ }
+
+ private static boolean hasHttpInput(InvocationRequest request) {
+ for (ParameterBinding binding : request.getInputDataList()) {
+ if (binding.getData().hasHttp()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static RpcHttp extractHttpResponse(InvocationResponse.Builder response,
+ List outputBindings) {
+ if (response.hasReturnValue() && response.getReturnValue().hasHttp()) {
+ return response.getReturnValue().getHttp();
+ }
+ for (ParameterBinding binding : outputBindings) {
+ if (binding.getData().hasHttp()) {
+ return binding.getData().getHttp();
+ }
+ }
+ // No HTTP response binding produced; respond with an empty 200 so the
+ // host doesn't see a hung connection.
+ return RpcHttp.newBuilder().setStatusCode("200").build();
+ }
+
+ private static Exception asException(Throwable t) {
+ if (t instanceof Exception) {
+ return (Exception) t;
+ }
+ if (t instanceof Error) {
+ throw (Error) t;
+ }
+ return new RuntimeException(t);
+ }
+
+ private final JavaFunctionBroker broker;
+ private final HttpInvocationCoordinator httpInvocationCoordinator;
private Logger invocationLogger;
}
diff --git a/src/main/java/com/microsoft/azure/functions/worker/handler/WorkerInitRequestHandler.java b/src/main/java/com/microsoft/azure/functions/worker/handler/WorkerInitRequestHandler.java
index 618833a..6efff72 100644
--- a/src/main/java/com/microsoft/azure/functions/worker/handler/WorkerInitRequestHandler.java
+++ b/src/main/java/com/microsoft/azure/functions/worker/handler/WorkerInitRequestHandler.java
@@ -3,7 +3,11 @@
import com.microsoft.azure.functions.worker.*;
import com.microsoft.azure.functions.rpc.messages.*;
import com.microsoft.azure.functions.worker.broker.JavaFunctionBroker;
+import com.microsoft.azure.functions.worker.http.HttpInvocationCoordinator;
+import com.microsoft.azure.functions.worker.http.HttpProxyHandler;
+import com.microsoft.azure.functions.worker.http.HttpProxyServer;
+import java.io.IOException;
import java.util.logging.Level;
import static com.microsoft.azure.functions.worker.Constants.JAVA_APPLICATIONINSIGHTS_ENABLE_TELEMETRY;
@@ -11,11 +15,19 @@
public class WorkerInitRequestHandler extends MessageHandler {
public WorkerInitRequestHandler(JavaFunctionBroker broker) {
+ this(broker, null, null);
+ }
+
+ public WorkerInitRequestHandler(JavaFunctionBroker broker,
+ HttpProxyServer httpProxyServer,
+ HttpInvocationCoordinator httpInvocationCoordinator) {
super(StreamingMessage::getWorkerInitRequest,
WorkerInitResponse::newBuilder,
WorkerInitResponse.Builder::setResult,
StreamingMessage.Builder::setWorkerInitResponse);
this.broker = broker;
+ this.httpProxyServer = httpProxyServer;
+ this.httpInvocationCoordinator = httpInvocationCoordinator;
}
@Override
@@ -30,6 +42,8 @@ String execute(WorkerInitRequest request, WorkerInitResponse.Builder response) {
response.putCapabilities("HandlesWorkerTerminateMessage", "HandlesWorkerTerminateMessage");
response.putCapabilities("HandlesWorkerWarmupMessage", "HandlesWorkerWarmupMessage");
+ advertiseHttpProxy(response);
+
if (Boolean.parseBoolean(System.getenv(JAVA_ENABLE_OPENTELEMETRY)) ||
Boolean.parseBoolean(System.getenv(JAVA_APPLICATIONINSIGHTS_ENABLE_TELEMETRY))) {
response.putCapabilities("WorkerOpenTelemetryEnabled", "true");
@@ -41,6 +55,23 @@ String execute(WorkerInitRequest request, WorkerInitResponse.Builder response) {
return "Worker initialized";
}
+ private void advertiseHttpProxy(WorkerInitResponse.Builder response) {
+ if (httpProxyServer == null || httpInvocationCoordinator == null) {
+ return;
+ }
+ try {
+ String uri = httpProxyServer.start(new HttpProxyHandler(httpInvocationCoordinator));
+ response.putCapabilities("HttpUri", uri);
+ response.putCapabilities("RequiresRouteParameters", "true");
+ WorkerLogManager.getSystemLogger().log(Level.INFO,
+ "Java worker HTTP proxy listening on " + uri);
+ } catch (IOException ex) {
+ // Fall back to gRPC-only path: simply do not advertise HttpUri.
+ WorkerLogManager.getSystemLogger().log(Level.WARNING,
+ "Failed to start HTTP proxy server; continuing without HttpUri capability", ex);
+ }
+ }
+
private WorkerMetadata.Builder composeWorkerMetadata(){
WorkerMetadata.Builder workerMetadataBuilder = WorkerMetadata.newBuilder();
workerMetadataBuilder.setRuntimeName("java");
@@ -51,4 +82,6 @@ private WorkerMetadata.Builder composeWorkerMetadata(){
}
private final JavaFunctionBroker broker;
+ private final HttpProxyServer httpProxyServer;
+ private final HttpInvocationCoordinator httpInvocationCoordinator;
}
From 4794ffaad4b4353f076cdfdd905649eef8ac2226 Mon Sep 17 00:00:00 2001
From: Ahmed Muhsin
Date: Wed, 3 Jun 2026 17:12:39 -0500
Subject: [PATCH 04/10] Add streaming output support via
HttpResponseMessage.Builder.bodyStream()
When a function returns an HttpResponseMessage whose body is an
InputStream or HttpResponseMessage.IOConsumer, the HTTP
proxy dispatch path now writes the body directly to the underlying
HttpExchange response stream using chunked transfer-encoding instead
of buffering the entire payload through a protobuf TypedData. This
enables Server-Sent Events, large file downloads, and other long-lived
streaming responses without first materializing the whole body in
memory.
Design highlights:
- RpcHttpDataTarget.toRpcHttpData detects streaming bodies and skips
the RpcUnspecifiedDataTarget serialization step, leaving the RpcHttp
body field unset so the status + headers envelope still flows through
the existing pipeline unchanged.
- BindingDataStore exposes getHttpResponseRawBody() so the dispatch
layer can recover the raw (unserialized) body without ripping apart
the protobuf reply.
- JavaFunctionBroker.invokeMethodForHttpProxy is a new method overload
(not a modification of invokeMethod) that returns an
HttpInvocationOutcome containing both the protobuf reply and the raw
body. The original invokeMethod is left untouched for backward
compatibility with the existing gRPC dispatch path.
- HttpBodyBridge.writeStreamingResponse(InputStream) and
writeStreamingResponse(IOConsumer) handle the actual streaming
write; both use sendResponseHeaders(status, 0) to select chunked
transfer-encoding (or close-delimited for HTTP/1.0 clients) and
close/flush the streams reliably in try-with-resources.
- InvocationRequestHandler.executeProxiedHttp dispatches based on the
raw body type, falling back to the existing buffered writer when the
body is not a streaming type.
The pom.xml is bumped to consume azure-functions-java-core-library
1.4.0-SNAPSHOT, which adds the bodyStream() overloads and the
IOConsumer functional interface.
Tests added: 5 new HttpBodyBridge streaming tests, 8 new
RpcHttpDataTarget tests, 6 new BindingDataStore tests. Total worker
test suite: 130 passing (was 111).
---
pom.xml | 2 +-
.../worker/binding/BindingDataStore.java | 25 +++++
.../worker/binding/RpcHttpDataTarget.java | 25 ++++-
.../worker/broker/JavaFunctionBroker.java | 55 ++++++++++
.../handler/InvocationRequestHandler.java | 29 ++++-
.../functions/worker/http/HttpBodyBridge.java | 52 +++++++++
.../worker/binding/BindingDataStoreTest.java | 96 +++++++++++++++++
.../worker/binding/RpcHttpDataTargetTest.java | 91 ++++++++++++++++
.../worker/http/HttpBodyBridgeTest.java | 100 ++++++++++++++++++
9 files changed, 468 insertions(+), 7 deletions(-)
create mode 100644 src/test/java/com/microsoft/azure/functions/worker/binding/BindingDataStoreTest.java
create mode 100644 src/test/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTargetTest.java
diff --git a/pom.xml b/pom.xml
index cf3253d..178f9ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
UTF-81.81.8
- 1.3.0
+ 1.4.0-SNAPSHOT1.1.01.0.22.2.0
diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/BindingDataStore.java b/src/main/java/com/microsoft/azure/functions/worker/binding/BindingDataStore.java
index 1ccd91c..300006b 100644
--- a/src/main/java/com/microsoft/azure/functions/worker/binding/BindingDataStore.java
+++ b/src/main/java/com/microsoft/azure/functions/worker/binding/BindingDataStore.java
@@ -125,6 +125,31 @@ public Optional getDataTargetTypedValue(String name) throws Exception
});
}
+ /**
+ * Returns the raw, unserialized response body of the HTTP output target, or
+ * {@code null} if no HTTP output target is registered or its body is null.
+ *
+ *
Used by the HTTP proxy path to recover streaming bodies
+ * ({@link java.io.InputStream}, {@code HttpResponseMessage.IOConsumer}) that
+ * cannot be represented in a protobuf {@code TypedData} and must instead be
+ * written directly to the {@code HttpExchange} response stream.
+ */
+ public Object getHttpResponseRawBody() {
+ if (this.promotedTargets == null) {
+ return null;
+ }
+ Map promoted = this.targets.get(this.promotedTargets);
+ if (promoted == null) {
+ return null;
+ }
+ for (DataTarget target : promoted.values()) {
+ if (target instanceof RpcHttpDataTarget) {
+ return ((RpcHttpDataTarget) target).getBody();
+ }
+ }
+ return null;
+ }
+
public Optional getOrAddDataTarget(UUID outputId, String name, Type target, boolean ignoreDefinition) {
DataTarget output = null;
if (this.isDataTargetValid(name, target)) {
diff --git a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTarget.java b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTarget.java
index 16b45f1..17bd7e9 100644
--- a/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTarget.java
+++ b/src/main/java/com/microsoft/azure/functions/worker/binding/RpcHttpDataTarget.java
@@ -1,9 +1,11 @@
package com.microsoft.azure.functions.worker.binding;
+import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import com.microsoft.azure.functions.HttpResponseMessage;
+import com.microsoft.azure.functions.HttpResponseMessage.IOConsumer;
import com.microsoft.azure.functions.HttpStatus;
import com.microsoft.azure.functions.HttpStatusType;
import com.microsoft.azure.functions.rpc.messages.RpcHttp;
@@ -37,14 +39,31 @@ public static TypedData.Builder toRpcHttpData(RpcHttpDataTarget response) throws
if (response != null) {
RpcHttp.Builder httpBuilder = RpcHttp.newBuilder().setStatusCode(Integer.toString(response.getStatusCode()));
response.headers.forEach(httpBuilder::putHeaders);
- RpcUnspecifiedDataTarget bodyTarget = new RpcUnspecifiedDataTarget();
- bodyTarget.setValue(response.getBody());
- bodyTarget.computeFromValue().ifPresent(httpBuilder::setBody);
+ Object body = response.getBody();
+ if (isStreamingBody(body)) {
+ // Streaming bodies (InputStream / IOConsumer) cannot be serialized into a
+ // protobuf TypedData; they are written directly to the HTTP response by the
+ // HTTP proxy path. Leave the RpcHttp body unset so downstream code sees an
+ // empty envelope but can still read status + headers.
+ } else {
+ RpcUnspecifiedDataTarget bodyTarget = new RpcUnspecifiedDataTarget();
+ bodyTarget.setValue(body);
+ bodyTarget.computeFromValue().ifPresent(httpBuilder::setBody);
+ }
dataBuilder.setHttp(httpBuilder);
}
return dataBuilder;
}
+ /**
+ * Returns {@code true} if {@code body} is a streaming response body type that
+ * should bypass protobuf serialization and be written directly to the HTTP
+ * response by the worker's HTTP proxy.
+ */
+ static boolean isStreamingBody(Object body) {
+ return body instanceof InputStream || body instanceof IOConsumer;
+ }
+
private static final DataOperations