Skip to content

Commit 247bf20

Browse files
mohityadav766manerow
authored andcommitted
Fix Virtual Threads unbounded (#26013)
* Fix Virtual Threads unbounded * Bound all AsyncService paths with semaphore-wrapped executor --------- Co-authored-by: Adrià Manero <adria.estivill@getcollate.io>
1 parent 2359c14 commit 247bf20

5 files changed

Lines changed: 205 additions & 43 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@
167167
import org.openmetadata.service.socket.WebSocketManager;
168168
import org.openmetadata.service.swagger.SwaggerBundle;
169169
import org.openmetadata.service.swagger.SwaggerBundleConfiguration;
170+
import org.openmetadata.service.util.AsyncService;
170171
import org.openmetadata.service.util.CustomParameterNameProvider;
171172
import org.openmetadata.service.util.incidentSeverityClassifier.IncidentSeverityClassifierInterface;
172173
import org.quartz.SchedulerException;
@@ -1099,6 +1100,7 @@ public void start() {
10991100
public void stop() throws InterruptedException, SchedulerException {
11001101
LOG.info("Cache with Id Stats {}", EntityRepository.CACHE_WITH_ID.stats());
11011102
LOG.info("Cache with name Stats {}", EntityRepository.CACHE_WITH_NAME.stats());
1103+
AsyncService.getInstance().shutdown();
11021104
EventPubSub.shutdown();
11031105
AppScheduler.shutDown();
11041106
EventSubscriptionScheduler.shutDown();

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedJobParticipant.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class DistributedJobParticipant implements Managed {
6363
*/
6464
@Getter private UUID currentJobId;
6565

66+
private volatile Thread participantThread;
67+
6668
public DistributedJobParticipant(
6769
CollectionDAO collectionDAO,
6870
SearchRepository searchRepository,
@@ -116,6 +118,18 @@ public void start() {
116118
@Override
117119
public void stop() {
118120
if (running.compareAndSet(true, false)) {
121+
Thread thread = participantThread;
122+
if (thread != null) {
123+
thread.interrupt();
124+
try {
125+
thread.join(10_000);
126+
if (thread.isAlive()) {
127+
LOG.warn("Participant thread did not terminate within 10s after interrupt");
128+
}
129+
} catch (InterruptedException e) {
130+
Thread.currentThread().interrupt();
131+
}
132+
}
119133
notifier.stop();
120134
LOG.info("Stopped distributed job participant on server {}", serverId);
121135
}
@@ -190,22 +204,22 @@ private void joinAndProcessJob(SearchIndexJob job) {
190204
pollingNotifier.setParticipating(true);
191205
}
192206

193-
Thread.ofVirtual()
194-
.name("job-participant-" + job.getId().toString().substring(0, 8))
195-
.start(
196-
() -> {
197-
try {
198-
processJobPartitions(job);
199-
} finally {
200-
participating.set(false);
201-
currentJobId = null;
202-
203-
// Reset polling notifier to idle interval
204-
if (notifier instanceof PollingJobNotifier pollingNotifier) {
205-
pollingNotifier.setParticipating(false);
206-
}
207-
}
208-
});
207+
participantThread =
208+
Thread.ofVirtual()
209+
.name("job-participant-" + job.getId().toString().substring(0, 8))
210+
.start(
211+
() -> {
212+
try {
213+
processJobPartitions(job);
214+
} finally {
215+
currentJobId = null;
216+
if (notifier instanceof PollingJobNotifier pollingNotifier) {
217+
pollingNotifier.setParticipating(false);
218+
}
219+
participating.set(false);
220+
participantThread = null;
221+
}
222+
});
209223
}
210224

211225
/** Process partitions for a job. */

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedSearchIndexExecutor.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public static boolean isCoordinatingJob(UUID jobId) {
105105
private final List<PartitionWorker> activeWorkers = new ArrayList<>();
106106
private Thread lockRefreshThread;
107107
private Thread partitionHeartbeatThread;
108+
private Thread staleReclaimerThread;
108109

109110
// App context for WebSocket broadcasts
110111
private UUID appId;
@@ -442,8 +443,7 @@ public ExecutionResult execute(
442443
});
443444
}
444445

445-
// Start stale partition reclaimer in a separate thread
446-
Thread staleReclaimer =
446+
staleReclaimerThread =
447447
Thread.ofVirtual()
448448
.name("stale-reclaimer-" + jobId.toString().substring(0, 8))
449449
.start(() -> runStaleReclaimerLoop(jobId));
@@ -462,14 +462,9 @@ public ExecutionResult execute(
462462
Thread.currentThread().interrupt();
463463
LOG.warn("Execution interrupted for job {}", jobId);
464464
} finally {
465-
// Stop all background threads
466-
staleReclaimer.interrupt();
467-
if (lockRefreshThread != null) {
468-
lockRefreshThread.interrupt();
469-
}
470-
if (partitionHeartbeatThread != null) {
471-
partitionHeartbeatThread.interrupt();
472-
}
465+
interruptAndJoin(staleReclaimerThread, "stale-reclaimer");
466+
interruptAndJoin(lockRefreshThread, "lock-refresh");
467+
interruptAndJoin(partitionHeartbeatThread, "partition-heartbeat");
473468

474469
// Shutdown executor
475470
workerExecutor.shutdown();
@@ -849,6 +844,26 @@ private void markJobAsFailedDueToLostLock(UUID jobId) {
849844
}
850845
}
851846

847+
private static final long THREAD_JOIN_TIMEOUT_MS = 5000;
848+
849+
private void interruptAndJoin(Thread thread, String name) {
850+
if (thread == null) {
851+
return;
852+
}
853+
thread.interrupt();
854+
try {
855+
thread.join(THREAD_JOIN_TIMEOUT_MS);
856+
if (thread.isAlive()) {
857+
LOG.warn(
858+
"Thread {} did not terminate within {}ms after interrupt",
859+
name,
860+
THREAD_JOIN_TIMEOUT_MS);
861+
}
862+
} catch (InterruptedException e) {
863+
Thread.currentThread().interrupt();
864+
}
865+
}
866+
852867
/**
853868
* Request to stop the current job execution.
854869
*/

openmetadata-service/src/main/java/org/openmetadata/service/audit/AuditLogRepository.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.openmetadata.schema.utils.ResultList;
1919
import org.openmetadata.service.Entity;
2020
import org.openmetadata.service.jdbi3.CollectionDAO;
21+
import org.openmetadata.service.util.AsyncService;
2122
import org.openmetadata.service.util.FullyQualifiedName;
2223
import org.openmetadata.service.util.RestUtil;
2324

@@ -122,18 +123,19 @@ public void write(ChangeEvent changeEvent, boolean isBot) {
122123
* populated. Runs asynchronously using a virtual thread to avoid blocking the caller.
123124
*/
124125
public void writeAuthEvent(EventType eventType, String userName, UUID userId) {
125-
Thread.startVirtualThread(
126-
() -> {
127-
ChangeEvent changeEvent =
128-
new ChangeEvent()
129-
.withId(UUID.randomUUID())
130-
.withEventType(eventType)
131-
.withEntityType(Entity.USER)
132-
.withEntityId(userId)
133-
.withUserName(userName)
134-
.withTimestamp(System.currentTimeMillis());
135-
write(changeEvent);
136-
});
126+
AsyncService.getInstance()
127+
.execute(
128+
() -> {
129+
ChangeEvent changeEvent =
130+
new ChangeEvent()
131+
.withId(UUID.randomUUID())
132+
.withEventType(eventType)
133+
.withEntityType(Entity.USER)
134+
.withEntityId(userId)
135+
.withUserName(userName)
136+
.withTimestamp(System.currentTimeMillis());
137+
write(changeEvent);
138+
});
137139
}
138140

139141
/** Determine actor type from username pattern - agents, bots, or regular users. */

openmetadata-service/src/main/java/org/openmetadata/service/util/AsyncService.java

Lines changed: 134 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,66 @@
11
package org.openmetadata.service.util;
22

3+
import java.util.List;
4+
import java.util.concurrent.AbstractExecutorService;
5+
import java.util.concurrent.Callable;
36
import java.util.concurrent.CompletableFuture;
47
import java.util.concurrent.ExecutorService;
58
import java.util.concurrent.Executors;
9+
import java.util.concurrent.Semaphore;
610
import java.util.concurrent.TimeUnit;
711
import java.util.concurrent.TimeoutException;
812
import java.util.function.Supplier;
13+
import lombok.Getter;
914
import lombok.extern.slf4j.Slf4j;
15+
import org.openmetadata.service.OpenMetadataApplicationConfigHolder;
1016

1117
@Slf4j
1218
public class AsyncService {
1319
private static AsyncService instance;
1420
private final ExecutorService executorService;
21+
private final Semaphore concurrencyLimiter;
22+
@Getter private final int maxConcurrency;
1523

16-
// Default retry configuration
1724
private static final int DEFAULT_MAX_RETRIES = 3;
18-
private static final long DEFAULT_INITIAL_RETRY_DELAY_MS = 1000; // 1 second
19-
private static final long DEFAULT_OPERATION_TIMEOUT_SECONDS = 60; // 60 seconds
25+
private static final long DEFAULT_INITIAL_RETRY_DELAY_MS = 1000;
26+
private static final long DEFAULT_OPERATION_TIMEOUT_SECONDS = 60;
27+
28+
private static final long SHUTDOWN_TIMEOUT_SECONDS = 30;
2029

2130
private AsyncService() {
22-
executorService = Executors.newVirtualThreadPerTaskExecutor();
31+
maxConcurrency = resolveMaxConcurrency();
32+
concurrencyLimiter = new Semaphore(maxConcurrency);
33+
executorService =
34+
new BoundedExecutorService(Executors.newVirtualThreadPerTaskExecutor(), concurrencyLimiter);
35+
LOG.info("AsyncService initialized with max concurrency: {}", maxConcurrency);
36+
}
37+
38+
private static int resolveMaxConcurrency() {
39+
String env = System.getenv("ASYNC_SERVICE_MAX_CONCURRENCY");
40+
if (env != null) {
41+
try {
42+
int value = Integer.parseInt(env.trim());
43+
if (value > 0) {
44+
return value;
45+
}
46+
} catch (NumberFormatException ignored) {
47+
}
48+
}
49+
int cpuBudget = Runtime.getRuntime().availableProcessors() * 2;
50+
try {
51+
if (OpenMetadataApplicationConfigHolder.isInitialized()) {
52+
int poolSize =
53+
OpenMetadataApplicationConfigHolder.getInstance().getDataSourceFactory().getMaxSize();
54+
if (poolSize > 0) {
55+
return Math.max(4, Math.min(cpuBudget, poolSize / 3));
56+
}
57+
}
58+
} catch (Exception e) {
59+
LOG.warn(
60+
"Could not determine database pool size, using CPU-based concurrency budget: {}",
61+
e.getMessage());
62+
}
63+
return Math.max(4, cpuBudget);
2364
}
2465

2566
public static synchronized AsyncService getInstance() {
@@ -33,9 +74,38 @@ public ExecutorService getExecutorService() {
3374
return executorService;
3475
}
3576

36-
// Optionally, provide a method to shut down the executor service
77+
public void execute(Runnable task) {
78+
executorService.execute(task);
79+
}
80+
81+
public <T> CompletableFuture<T> submit(Callable<T> task) {
82+
return CompletableFuture.supplyAsync(
83+
() -> {
84+
try {
85+
return task.call();
86+
} catch (RuntimeException e) {
87+
throw e;
88+
} catch (Exception e) {
89+
throw new RuntimeException(e);
90+
}
91+
},
92+
executorService);
93+
}
94+
3795
public void shutdown() {
96+
LOG.info("Shutting down AsyncService executor (max concurrency: {})", maxConcurrency);
3897
executorService.shutdown();
98+
try {
99+
if (!executorService.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
100+
LOG.warn(
101+
"AsyncService executor did not terminate within {}s, forcing shutdown",
102+
SHUTDOWN_TIMEOUT_SECONDS);
103+
executorService.shutdownNow();
104+
}
105+
} catch (InterruptedException e) {
106+
executorService.shutdownNow();
107+
Thread.currentThread().interrupt();
108+
}
39109
}
40110

41111
/**
@@ -161,4 +231,63 @@ private static <T> T executeWithRetry(
161231
throw new RuntimeException(
162232
String.format("Failed to %s %s", operationName.toLowerCase(), context), lastException);
163233
}
234+
235+
/**
236+
* ExecutorService wrapper that enforces concurrency limits via a semaphore. Every task submitted
237+
* through any method (execute, submit, invokeAll, invokeAny) acquires a permit before running and
238+
* releases it on completion. This ensures ALL callers — including those using getExecutorService()
239+
* directly — are bounded.
240+
*/
241+
private static class BoundedExecutorService extends AbstractExecutorService {
242+
private final ExecutorService delegate;
243+
private final Semaphore semaphore;
244+
245+
BoundedExecutorService(ExecutorService delegate, Semaphore semaphore) {
246+
this.delegate = delegate;
247+
this.semaphore = semaphore;
248+
}
249+
250+
@Override
251+
public void execute(Runnable command) {
252+
delegate.execute(
253+
() -> {
254+
try {
255+
semaphore.acquire();
256+
} catch (InterruptedException e) {
257+
Thread.currentThread().interrupt();
258+
throw new RuntimeException("Interrupted waiting for concurrency permit", e);
259+
}
260+
try {
261+
command.run();
262+
} finally {
263+
semaphore.release();
264+
}
265+
});
266+
}
267+
268+
@Override
269+
public void shutdown() {
270+
delegate.shutdown();
271+
}
272+
273+
@Override
274+
public List<Runnable> shutdownNow() {
275+
return delegate.shutdownNow();
276+
}
277+
278+
@Override
279+
public boolean isShutdown() {
280+
return delegate.isShutdown();
281+
}
282+
283+
@Override
284+
public boolean isTerminated() {
285+
return delegate.isTerminated();
286+
}
287+
288+
@Override
289+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
290+
return delegate.awaitTermination(timeout, unit);
291+
}
292+
}
164293
}

0 commit comments

Comments
 (0)