Skip to content

Commit 0ea384a

Browse files
committed
Propagate security context to sql-worker thread pool
Wrap scheduled lambdas in both execute() and executeViaTransport() with withCurrentContext() to capture and restore ThreadContext (user identity, permissions, audit trail) on the worker thread. Follows the same pattern as OpenSearchQueryManager.withCurrentContext(). Signed-off-by: Kai Huang <kaihuang@amazon.com> Signed-off-by: Kai Huang <ahkcs@amazon.com>
1 parent 6ce3ac5 commit 0ea384a

1 file changed

Lines changed: 17 additions & 2 deletions

File tree

plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
1212
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;
1313

14+
import java.util.Map;
1415
import org.apache.calcite.rel.RelNode;
1516
import org.apache.calcite.schema.impl.AbstractSchema;
1617
import org.apache.logging.log4j.LogManager;
1718
import org.apache.logging.log4j.Logger;
19+
import org.apache.logging.log4j.ThreadContext;
1820
import org.opensearch.common.unit.TimeValue;
1921
import org.opensearch.core.action.ActionListener;
2022
import org.opensearch.core.rest.RestStatus;
@@ -84,7 +86,7 @@ public void execute(String query, QueryType queryType, RestChannel channel) {
8486
client
8587
.threadPool()
8688
.schedule(
87-
() -> doExecute(query, queryType, channel),
89+
withCurrentContext(() -> doExecute(query, queryType, channel)),
8890
new TimeValue(0),
8991
SQL_WORKER_THREAD_POOL_NAME);
9092
}
@@ -107,7 +109,7 @@ public void executeViaTransport(
107109
client
108110
.threadPool()
109111
.schedule(
110-
() -> doExecuteViaTransport(query, queryType, pplRequest, listener),
112+
withCurrentContext(() -> doExecuteViaTransport(query, queryType, pplRequest, listener)),
111113
new TimeValue(0),
112114
SQL_WORKER_THREAD_POOL_NAME);
113115
}
@@ -298,6 +300,19 @@ private static void recordFailureMetric(QueryType queryType, Exception e) {
298300
}
299301
}
300302

303+
/**
304+
* Capture current thread context and restore it on the worker thread. Ensures security context
305+
* (user identity, permissions) is propagated. Same pattern as {@link
306+
* org.opensearch.sql.opensearch.executor.OpenSearchQueryManager#withCurrentContext}.
307+
*/
308+
private static Runnable withCurrentContext(final Runnable task) {
309+
final Map<String, String> currentContext = ThreadContext.getImmutableContext();
310+
return () -> {
311+
ThreadContext.putAll(currentContext);
312+
task.run();
313+
};
314+
}
315+
301316
private static void reportError(RestChannel channel, Exception e) {
302317
RestStatus status =
303318
isClientError(e) ? RestStatus.BAD_REQUEST : RestStatus.INTERNAL_SERVER_ERROR;

0 commit comments

Comments
 (0)