Skip to content

Commit 2784034

Browse files
authored
Fix: Revert unintended bulk sink serialization from 2839bc2 that broke test case search indexing (#27202)
Commit 2839bc2 cherry-picked PR #27153 ("Improve memory usage for reindex") to 1.12.5, bundled with the revert of PR #27103. PR #27153 is still open/unmerged on main and introduced extra file changes that were never meant for 1.12.5. The bulk sink change from toJsonData(json_string) to toJsonData(map_object) bypassed Jackson's WRITE_DATES_AS_TIMESTAMPS=false setting. During reindex, java.util.Date fields (like tags.appliedAt) were sent as raw epoch Longs instead of ISO strings. OpenSearch dynamically mapped tags.appliedAt as "long". Later, real-time indexing (test case creation via API) sent appliedAt as ISO string — OpenSearch rejected it with mapper_parsing_exception. Test cases with tags were created in DB but silently never indexed in search. Changes: - ElasticSearchBulkSink/OpenSearchBulkSink: reverted serialization back to pojoToJson → string → toJsonData(string), kept Thread.MIN_PRIORITY - EsUtils/OsUtils: removed toJsonData(Object) overload - SearchIndexExecutor: removed contextDataCache, kept Thread.MIN_PRIORITY - ReindexingMetrics: removed counter caching Requires reindex after deployment.
1 parent 5d7f5c1 commit 2784034

6 files changed

Lines changed: 58 additions & 87 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -254,16 +254,18 @@ private void addEntity(
254254
try {
255255
String entityType = Entity.getEntityTypeFromObject(entity);
256256
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
257-
es.co.elastic.clients.json.JsonData jsonData = EsUtils.toJsonData(searchIndexDoc);
257+
String json = JsonUtils.pojoToJson(searchIndexDoc);
258258
String docId = entity.getId().toString();
259259
long estimatedSize =
260-
(long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD;
260+
(long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD;
261261

262262
BulkOperation operation;
263263
if (recreateIndex) {
264264
operation =
265265
BulkOperation.of(
266-
op -> op.index(idx -> idx.index(indexName).id(docId).document(jsonData)));
266+
op ->
267+
op.index(
268+
idx -> idx.index(indexName).id(docId).document(EsUtils.toJsonData(json))));
267269
} else {
268270
operation =
269271
BulkOperation.of(
@@ -272,7 +274,7 @@ private void addEntity(
272274
upd ->
273275
upd.index(indexName)
274276
.id(docId)
275-
.action(a -> a.doc(jsonData).docAsUpsert(true))));
277+
.action(a -> a.doc(EsUtils.toJsonData(json)).docAsUpsert(true))));
276278
}
277279
if (tracker != null) {
278280
tracker.incrementPendingSink();
@@ -327,14 +329,16 @@ private void addTimeSeriesEntity(
327329
StageStatsTracker tracker) {
328330
try {
329331
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
330-
es.co.elastic.clients.json.JsonData jsonData = EsUtils.toJsonData(searchIndexDoc);
332+
String json = JsonUtils.pojoToJson(searchIndexDoc);
331333
String docId = entity.getId().toString();
332334
long estimatedSize =
333-
(long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD;
335+
(long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD;
334336

335337
BulkOperation operation =
336338
BulkOperation.of(
337-
op -> op.index(idx -> idx.index(indexName).id(docId).document(jsonData)));
339+
op ->
340+
op.index(
341+
idx -> idx.index(indexName).id(docId).document(EsUtils.toJsonData(json))));
338342

339343
if (tracker != null) {
340344
tracker.incrementPendingSink();

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -300,23 +300,28 @@ private void addEntity(
300300
try {
301301
String entityType = Entity.getEntityTypeFromObject(entity);
302302
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
303-
os.org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(searchIndexDoc);
303+
String json = JsonUtils.pojoToJson(searchIndexDoc);
304304
String docId = entity.getId().toString();
305305
long estimatedSize =
306-
(long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD;
306+
(long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD;
307307

308308
BulkOperation operation;
309309
if (recreateIndex) {
310310
operation =
311311
BulkOperation.of(
312-
op -> op.index(idx -> idx.index(indexName).id(docId).document(jsonData)));
312+
op ->
313+
op.index(
314+
idx -> idx.index(indexName).id(docId).document(OsUtils.toJsonData(json))));
313315
} else {
314316
operation =
315317
BulkOperation.of(
316318
op ->
317319
op.update(
318320
upd ->
319-
upd.index(indexName).id(docId).document(jsonData).docAsUpsert(true)));
321+
upd.index(indexName)
322+
.id(docId)
323+
.document(OsUtils.toJsonData(json))
324+
.docAsUpsert(true)));
320325
}
321326
if (tracker != null) {
322327
tracker.incrementPendingSink();
@@ -371,14 +376,16 @@ private void addTimeSeriesEntity(
371376
StageStatsTracker tracker) {
372377
try {
373378
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
374-
os.org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(searchIndexDoc);
379+
String json = JsonUtils.pojoToJson(searchIndexDoc);
375380
String docId = entity.getId().toString();
376381
long estimatedSize =
377-
(long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD;
382+
(long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD;
378383

379384
BulkOperation operation =
380385
BulkOperation.of(
381-
op -> op.index(idx -> idx.index(indexName).id(docId).document(jsonData)));
386+
op ->
387+
op.index(
388+
idx -> idx.index(indexName).id(docId).document(OsUtils.toJsonData(json))));
382389

383390
if (tracker != null) {
384391
tracker.incrementPendingSink();

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingMetrics.java

Lines changed: 25 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,6 @@ public class ReindexingMetrics {
3939
// Circuit breaker
4040
private final Map<String, Counter> circuitBreakerCounters = new ConcurrentHashMap<>();
4141

42-
// Stage / promotion counter caches (keyed by "stage:entityType" or "result:entityType")
43-
private final Map<String, Counter> stageSuccessCounters = new ConcurrentHashMap<>();
44-
private final Map<String, Counter> stageFailedCounters = new ConcurrentHashMap<>();
45-
private final Map<String, Counter> stageWarningCounters = new ConcurrentHashMap<>();
46-
private final Map<String, Counter> promotionCounters = new ConcurrentHashMap<>();
47-
4842
// Vector timeouts
4943
private final Counter vectorTimeouts;
5044

@@ -214,41 +208,29 @@ public void recordJobStopped(Timer.Sample sample) {
214208
// --- Stage counters (dynamic tags) ---
215209

216210
public void recordStageSuccess(String stage, String entityType, long count) {
217-
stageSuccessCounters
218-
.computeIfAbsent(
219-
stage + ":" + entityType,
220-
k ->
221-
Counter.builder("reindexing.stage.success")
222-
.description("Records successfully processed per stage")
223-
.tag("stage", stage)
224-
.tag("entity_type", entityType)
225-
.register(meterRegistry))
211+
Counter.builder("reindexing.stage.success")
212+
.description("Records successfully processed per stage")
213+
.tag("stage", stage)
214+
.tag("entity_type", entityType)
215+
.register(meterRegistry)
226216
.increment(count);
227217
}
228218

229219
public void recordStageFailed(String stage, String entityType, long count) {
230-
stageFailedCounters
231-
.computeIfAbsent(
232-
stage + ":" + entityType,
233-
k ->
234-
Counter.builder("reindexing.stage.failed")
235-
.description("Records failed per stage")
236-
.tag("stage", stage)
237-
.tag("entity_type", entityType)
238-
.register(meterRegistry))
220+
Counter.builder("reindexing.stage.failed")
221+
.description("Records failed per stage")
222+
.tag("stage", stage)
223+
.tag("entity_type", entityType)
224+
.register(meterRegistry)
239225
.increment(count);
240226
}
241227

242228
public void recordStageWarnings(String stage, String entityType, long count) {
243-
stageWarningCounters
244-
.computeIfAbsent(
245-
stage + ":" + entityType,
246-
k ->
247-
Counter.builder("reindexing.stage.warnings")
248-
.description("Reader warnings")
249-
.tag("stage", stage)
250-
.tag("entity_type", entityType)
251-
.register(meterRegistry))
229+
Counter.builder("reindexing.stage.warnings")
230+
.description("Reader warnings")
231+
.tag("stage", stage)
232+
.tag("entity_type", entityType)
233+
.register(meterRegistry)
252234
.increment(count);
253235
}
254236

@@ -285,28 +267,20 @@ public void recordBackpressureEvent() {
285267
// --- Promotion metrics (dynamic tags) ---
286268

287269
public void recordPromotionSuccess(String entityType) {
288-
promotionCounters
289-
.computeIfAbsent(
290-
"success:" + entityType,
291-
k ->
292-
Counter.builder("reindexing.promotion")
293-
.description("Index promotion events")
294-
.tag("entity_type", entityType)
295-
.tag("result", "success")
296-
.register(meterRegistry))
270+
Counter.builder("reindexing.promotion")
271+
.description("Index promotion events")
272+
.tag("entity_type", entityType)
273+
.tag("result", "success")
274+
.register(meterRegistry)
297275
.increment();
298276
}
299277

300278
public void recordPromotionFailure(String entityType) {
301-
promotionCounters
302-
.computeIfAbsent(
303-
"failure:" + entityType,
304-
k ->
305-
Counter.builder("reindexing.promotion")
306-
.description("Index promotion events")
307-
.tag("entity_type", entityType)
308-
.tag("result", "failure")
309-
.register(meterRegistry))
279+
Counter.builder("reindexing.promotion")
280+
.description("Index promotion events")
281+
.tag("entity_type", entityType)
282+
.tag("result", "failure")
283+
.register(meterRegistry)
310284
.increment();
311285
}
312286

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexExecutor.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ public class SearchIndexExecutor implements AutoCloseable {
134134
private final Map<String, AtomicInteger> entityBatchFailures = new ConcurrentHashMap<>();
135135
private final Set<String> promotedEntities = ConcurrentHashMap.newKeySet();
136136
private final Map<String, StageStatsTracker> sinkTrackers = new ConcurrentHashMap<>();
137-
private final Map<String, Map<String, Object>> contextDataCache = new ConcurrentHashMap<>();
138137
private static final long SINK_SYNC_INTERVAL_MS = 2000;
139138
private final AtomicLong lastSinkSyncTime = new AtomicLong(0);
140139

@@ -236,7 +235,6 @@ private void initializeState() {
236235
entityBatchFailures.clear();
237236
promotedEntities.clear();
238237
sinkTrackers.clear();
239-
contextDataCache.clear();
240238
lastSinkSyncTime.set(0);
241239
initStatsManager();
242240
}
@@ -537,18 +535,14 @@ private void processTask(IndexingTask<?> task) {
537535
}
538536

539537
private Map<String, Object> createContextData(String entityType) {
540-
return contextDataCache.computeIfAbsent(
541-
entityType,
542-
type -> {
543-
Map<String, Object> contextData = new HashMap<>();
544-
contextData.put(ENTITY_TYPE_KEY, type);
545-
contextData.put(RECREATE_INDEX, config.recreateIndex());
546-
contextData.put(RECREATE_CONTEXT, recreateContext);
547-
contextData.put(BulkSink.STATS_TRACKER_CONTEXT_KEY, getSinkTracker(type));
548-
getTargetIndexForEntity(type)
549-
.ifPresent(index -> contextData.put(TARGET_INDEX_KEY, index));
550-
return contextData;
551-
});
538+
Map<String, Object> contextData = new HashMap<>();
539+
contextData.put(ENTITY_TYPE_KEY, entityType);
540+
contextData.put(RECREATE_INDEX, config.recreateIndex());
541+
contextData.put(RECREATE_CONTEXT, recreateContext);
542+
contextData.put(BulkSink.STATS_TRACKER_CONTEXT_KEY, getSinkTracker(entityType));
543+
getTargetIndexForEntity(entityType)
544+
.ifPresent(index -> contextData.put(TARGET_INDEX_KEY, index));
545+
return contextData;
552546
}
553547

554548
private StageStatsTracker getSinkTracker(String entityType) {

openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/EsUtils.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,6 @@ public static Map<String, Object> jsonDataToMap(JsonData jsonData) {
6060
}
6161
}
6262

63-
public static JsonData toJsonData(Object pojo) {
64-
return JsonData.of(pojo);
65-
}
66-
6763
public static JsonData toJsonData(String doc) {
6864
Map<String, Object> docMap;
6965
try {

openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OsUtils.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ public static Map<String, Object> jsonDataToMap(JsonData jsonData) {
5555
}
5656
}
5757

58-
public static JsonData toJsonData(Object pojo) {
59-
return JsonData.of(pojo);
60-
}
61-
6258
public static JsonData toJsonData(String doc) {
6359
Map<String, Object> docMap;
6460
try {

0 commit comments

Comments
 (0)