Skip to content

Commit 1c3a86c

Browse files
committed
fix: switch EventSubscriptionScheduler to JDBC clustered JobStore to prevent duplicate events in HA (#26012)
1 parent b58c369 commit 1c3a86c

2 files changed

Lines changed: 34 additions & 5 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,12 @@ public void commit(JobExecutionContext jobExecutionContext) {
332332
jobExecutionContext
333333
.getJobDetail()
334334
.getJobDataMap()
335-
.put(ALERT_OFFSET_KEY, eventSubscriptionOffset);
335+
.put(ALERT_OFFSET_KEY, JsonUtils.pojoToJson(eventSubscriptionOffset));
336336

337-
jobExecutionContext.getJobDetail().getJobDataMap().put(ALERT_INFO_KEY, eventSubscription);
337+
jobExecutionContext
338+
.getJobDetail()
339+
.getJobDataMap()
340+
.put(ALERT_INFO_KEY, JsonUtils.pojoToJson(eventSubscription));
338341

339342
AlertMetrics metrics =
340343
new AlertMetrics()

openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.openmetadata.service.events.subscription.AlertUtil;
5151
import org.openmetadata.service.jdbi3.EntityRepository;
5252
import org.openmetadata.service.jdbi3.EventSubscriptionRepository;
53+
import org.openmetadata.service.jdbi3.locator.ConnectionType;
5354
import org.openmetadata.service.resources.events.subscription.TypedEvent;
5455
import org.openmetadata.service.util.DIContainer;
5556
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
@@ -102,8 +103,33 @@ private EventSubscriptionScheduler(
102103
Properties properties = new Properties();
103104
properties.put("org.quartz.scheduler.instanceName", SCHEDULER_NAME);
104105
properties.put("org.quartz.scheduler.instanceId", "AUTO");
106+
properties.put("org.quartz.scheduler.skipUpdateCheck", "true");
107+
properties.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
105108
properties.put("org.quartz.threadPool.threadCount", String.valueOf(SCHEDULER_THREAD_COUNT));
106-
properties.put("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");
109+
properties.put("org.quartz.threadPool.threadPriority", "5");
110+
properties.put("org.quartz.jobStore.misfireThreshold", "60000");
111+
properties.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
112+
properties.put("org.quartz.jobStore.useProperties", "true");
113+
properties.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
114+
properties.put("org.quartz.jobStore.isClustered", "true");
115+
properties.put("org.quartz.jobStore.dataSource", "myDS");
116+
properties.put("org.quartz.dataSource.myDS.maxConnections", "5");
117+
properties.put("org.quartz.dataSource.myDS.validationQuery", "select 1");
118+
properties.put(
119+
"org.quartz.dataSource.myDS.driver", config.getDataSourceFactory().getDriverClass());
120+
properties.put("org.quartz.dataSource.myDS.URL", config.getDataSourceFactory().getUrl());
121+
properties.put("org.quartz.dataSource.myDS.user", config.getDataSourceFactory().getUser());
122+
properties.put(
123+
"org.quartz.dataSource.myDS.password", config.getDataSourceFactory().getPassword());
124+
if (ConnectionType.MYSQL.label.equals(config.getDataSourceFactory().getDriverClass())) {
125+
properties.put(
126+
"org.quartz.jobStore.driverDelegateClass",
127+
"org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
128+
} else {
129+
properties.put(
130+
"org.quartz.jobStore.driverDelegateClass",
131+
"org.quartz.impl.jdbcjobstore.PostgreSQLDelegate");
132+
}
107133

108134
StdSchedulerFactory factory = new StdSchedulerFactory();
109135
factory.initialize(properties);
@@ -211,9 +237,9 @@ public boolean isSubscriptionRegistered(EventSubscription eventSubscription) {
211237
private JobDetail jobBuilder(
212238
AbstractEventConsumer publisher, EventSubscription eventSubscription, String jobIdentity) {
213239
JobDataMap dataMap = new JobDataMap();
214-
dataMap.put(ALERT_INFO_KEY, eventSubscription);
240+
dataMap.put(ALERT_INFO_KEY, JsonUtils.pojoToJson(eventSubscription));
215241
EventSubscriptionOffset startingOffset = getStartingOffset(eventSubscription.getId());
216-
dataMap.put(ALERT_OFFSET_KEY, startingOffset);
242+
dataMap.put(ALERT_OFFSET_KEY, JsonUtils.pojoToJson(startingOffset));
217243
JobBuilder jobBuilder =
218244
JobBuilder.newJob(publisher.getClass())
219245
.withIdentity(jobIdentity, ALERT_JOB_GROUP)

0 commit comments

Comments
 (0)