Skip to content

Commit 2839bc2

Browse files
committed
Cherry Pick : #27153
Reverted : Cherry Pick : Fix: Resolve text fields to .keyword for ES/OS sorting and aggregation  (#27103)
1 parent d6f7956 commit 2839bc2

18 files changed

Lines changed: 149 additions & 224 deletions

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ElasticSearchBulkSink.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -254,18 +254,16 @@ private void addEntity(
254254
try {
255255
String entityType = Entity.getEntityTypeFromObject(entity);
256256
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
257-
String json = JsonUtils.pojoToJson(searchIndexDoc);
257+
es.co.elastic.clients.json.JsonData jsonData = EsUtils.toJsonData(searchIndexDoc);
258258
String docId = entity.getId().toString();
259259
long estimatedSize =
260-
(long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD;
260+
(long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD;
261261

262262
BulkOperation operation;
263263
if (recreateIndex) {
264264
operation =
265265
BulkOperation.of(
266-
op ->
267-
op.index(
268-
idx -> idx.index(indexName).id(docId).document(EsUtils.toJsonData(json))));
266+
op -> op.index(idx -> idx.index(indexName).id(docId).document(jsonData)));
269267
} else {
270268
operation =
271269
BulkOperation.of(
@@ -274,7 +272,7 @@ private void addEntity(
274272
upd ->
275273
upd.index(indexName)
276274
.id(docId)
277-
.action(a -> a.doc(EsUtils.toJsonData(json)).docAsUpsert(true))));
275+
.action(a -> a.doc(jsonData).docAsUpsert(true))));
278276
}
279277
if (tracker != null) {
280278
tracker.incrementPendingSink();
@@ -329,16 +327,14 @@ private void addTimeSeriesEntity(
329327
StageStatsTracker tracker) {
330328
try {
331329
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
332-
String json = JsonUtils.pojoToJson(searchIndexDoc);
330+
es.co.elastic.clients.json.JsonData jsonData = EsUtils.toJsonData(searchIndexDoc);
333331
String docId = entity.getId().toString();
334332
long estimatedSize =
335-
(long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD;
333+
(long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD;
336334

337335
BulkOperation operation =
338336
BulkOperation.of(
339-
op ->
340-
op.index(
341-
idx -> idx.index(indexName).id(docId).document(EsUtils.toJsonData(json))));
337+
op -> op.index(idx -> idx.index(indexName).id(docId).document(jsonData)));
342338

343339
if (tracker != null) {
344340
tracker.incrementPendingSink();
@@ -576,7 +572,11 @@ public static class CustomBulkProcessor {
576572
this.circuitBreaker = circuitBreaker;
577573
this.scheduler =
578574
Executors.newScheduledThreadPool(
579-
1, Thread.ofPlatform().name("reindex-es-bulk-flush").factory());
575+
1,
576+
Thread.ofPlatform()
577+
.name("reindex-es-bulk-flush")
578+
.priority(Thread.MIN_PRIORITY)
579+
.factory());
580580

581581
scheduler.scheduleAtFixedRate(
582582
this::flushIfNeeded, flushIntervalMillis, flushIntervalMillis, TimeUnit.MILLISECONDS);

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/OpenSearchBulkSink.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -300,28 +300,23 @@ private void addEntity(
300300
try {
301301
String entityType = Entity.getEntityTypeFromObject(entity);
302302
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
303-
String json = JsonUtils.pojoToJson(searchIndexDoc);
303+
os.org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(searchIndexDoc);
304304
String docId = entity.getId().toString();
305305
long estimatedSize =
306-
(long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD;
306+
(long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD;
307307

308308
BulkOperation operation;
309309
if (recreateIndex) {
310310
operation =
311311
BulkOperation.of(
312-
op ->
313-
op.index(
314-
idx -> idx.index(indexName).id(docId).document(OsUtils.toJsonData(json))));
312+
op -> op.index(idx -> idx.index(indexName).id(docId).document(jsonData)));
315313
} else {
316314
operation =
317315
BulkOperation.of(
318316
op ->
319317
op.update(
320318
upd ->
321-
upd.index(indexName)
322-
.id(docId)
323-
.document(OsUtils.toJsonData(json))
324-
.docAsUpsert(true)));
319+
upd.index(indexName).id(docId).document(jsonData).docAsUpsert(true)));
325320
}
326321
if (tracker != null) {
327322
tracker.incrementPendingSink();
@@ -376,16 +371,14 @@ private void addTimeSeriesEntity(
376371
StageStatsTracker tracker) {
377372
try {
378373
Object searchIndexDoc = Entity.buildSearchIndex(entityType, entity).buildSearchIndexDoc();
379-
String json = JsonUtils.pojoToJson(searchIndexDoc);
374+
os.org.opensearch.client.json.JsonData jsonData = OsUtils.toJsonData(searchIndexDoc);
380375
String docId = entity.getId().toString();
381376
long estimatedSize =
382-
(long) json.getBytes(StandardCharsets.UTF_8).length + BULK_OPERATION_METADATA_OVERHEAD;
377+
(long) JsonUtils.pojoToJson(searchIndexDoc).length() + BULK_OPERATION_METADATA_OVERHEAD;
383378

384379
BulkOperation operation =
385380
BulkOperation.of(
386-
op ->
387-
op.index(
388-
idx -> idx.index(indexName).id(docId).document(OsUtils.toJsonData(json))));
381+
op -> op.index(idx -> idx.index(indexName).id(docId).document(jsonData)));
389382

390383
if (tracker != null) {
391384
tracker.incrementPendingSink();
@@ -791,7 +784,13 @@ public static class CustomBulkProcessor {
791784
this.totalFailed = totalFailed;
792785
this.statsUpdater = statsUpdater;
793786
this.circuitBreaker = circuitBreaker;
794-
this.scheduler = Executors.newScheduledThreadPool(1);
787+
this.scheduler =
788+
Executors.newScheduledThreadPool(
789+
1,
790+
Thread.ofPlatform()
791+
.name("reindex-os-bulk-flush")
792+
.priority(Thread.MIN_PRIORITY)
793+
.factory());
795794

796795
scheduler.scheduleAtFixedRate(
797796
this::flushIfNeeded, flushIntervalMillis, flushIntervalMillis, TimeUnit.MILLISECONDS);

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/ReindexingMetrics.java

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ public class ReindexingMetrics {
3939
// Circuit breaker
4040
private final Map<String, Counter> circuitBreakerCounters = new ConcurrentHashMap<>();
4141

42+
// Stage / promotion counter caches (keyed by "stage:entityType" or "result:entityType")
43+
private final Map<String, Counter> stageSuccessCounters = new ConcurrentHashMap<>();
44+
private final Map<String, Counter> stageFailedCounters = new ConcurrentHashMap<>();
45+
private final Map<String, Counter> stageWarningCounters = new ConcurrentHashMap<>();
46+
private final Map<String, Counter> promotionCounters = new ConcurrentHashMap<>();
47+
4248
// Vector timeouts
4349
private final Counter vectorTimeouts;
4450

@@ -208,29 +214,41 @@ public void recordJobStopped(Timer.Sample sample) {
208214
// --- Stage counters (dynamic tags) ---
209215

210216
public void recordStageSuccess(String stage, String entityType, long count) {
211-
Counter.builder("reindexing.stage.success")
212-
.description("Records successfully processed per stage")
213-
.tag("stage", stage)
214-
.tag("entity_type", entityType)
215-
.register(meterRegistry)
217+
stageSuccessCounters
218+
.computeIfAbsent(
219+
stage + ":" + entityType,
220+
k ->
221+
Counter.builder("reindexing.stage.success")
222+
.description("Records successfully processed per stage")
223+
.tag("stage", stage)
224+
.tag("entity_type", entityType)
225+
.register(meterRegistry))
216226
.increment(count);
217227
}
218228

219229
public void recordStageFailed(String stage, String entityType, long count) {
220-
Counter.builder("reindexing.stage.failed")
221-
.description("Records failed per stage")
222-
.tag("stage", stage)
223-
.tag("entity_type", entityType)
224-
.register(meterRegistry)
230+
stageFailedCounters
231+
.computeIfAbsent(
232+
stage + ":" + entityType,
233+
k ->
234+
Counter.builder("reindexing.stage.failed")
235+
.description("Records failed per stage")
236+
.tag("stage", stage)
237+
.tag("entity_type", entityType)
238+
.register(meterRegistry))
225239
.increment(count);
226240
}
227241

228242
public void recordStageWarnings(String stage, String entityType, long count) {
229-
Counter.builder("reindexing.stage.warnings")
230-
.description("Reader warnings")
231-
.tag("stage", stage)
232-
.tag("entity_type", entityType)
233-
.register(meterRegistry)
243+
stageWarningCounters
244+
.computeIfAbsent(
245+
stage + ":" + entityType,
246+
k ->
247+
Counter.builder("reindexing.stage.warnings")
248+
.description("Reader warnings")
249+
.tag("stage", stage)
250+
.tag("entity_type", entityType)
251+
.register(meterRegistry))
234252
.increment(count);
235253
}
236254

@@ -267,20 +285,28 @@ public void recordBackpressureEvent() {
267285
// --- Promotion metrics (dynamic tags) ---
268286

269287
public void recordPromotionSuccess(String entityType) {
270-
Counter.builder("reindexing.promotion")
271-
.description("Index promotion events")
272-
.tag("entity_type", entityType)
273-
.tag("result", "success")
274-
.register(meterRegistry)
288+
promotionCounters
289+
.computeIfAbsent(
290+
"success:" + entityType,
291+
k ->
292+
Counter.builder("reindexing.promotion")
293+
.description("Index promotion events")
294+
.tag("entity_type", entityType)
295+
.tag("result", "success")
296+
.register(meterRegistry))
275297
.increment();
276298
}
277299

278300
public void recordPromotionFailure(String entityType) {
279-
Counter.builder("reindexing.promotion")
280-
.description("Index promotion events")
281-
.tag("entity_type", entityType)
282-
.tag("result", "failure")
283-
.register(meterRegistry)
301+
promotionCounters
302+
.computeIfAbsent(
303+
"failure:" + entityType,
304+
k ->
305+
Counter.builder("reindexing.promotion")
306+
.description("Index promotion events")
307+
.tag("entity_type", entityType)
308+
.tag("result", "failure")
309+
.register(meterRegistry))
284310
.increment();
285311
}
286312

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexExecutor.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public class SearchIndexExecutor implements AutoCloseable {
134134
private final Map<String, AtomicInteger> entityBatchFailures = new ConcurrentHashMap<>();
135135
private final Set<String> promotedEntities = ConcurrentHashMap.newKeySet();
136136
private final Map<String, StageStatsTracker> sinkTrackers = new ConcurrentHashMap<>();
137+
private final Map<String, Map<String, Object>> contextDataCache = new ConcurrentHashMap<>();
137138
private static final long SINK_SYNC_INTERVAL_MS = 2000;
138139
private final AtomicLong lastSinkSyncTime = new AtomicLong(0);
139140

@@ -235,6 +236,7 @@ private void initializeState() {
235236
entityBatchFailures.clear();
236237
promotedEntities.clear();
237238
sinkTrackers.clear();
239+
contextDataCache.clear();
238240
lastSinkSyncTime.set(0);
239241
initStatsManager();
240242
}
@@ -397,17 +399,28 @@ private int initializeQueueAndExecutors(ThreadConfiguration threadConfig, int en
397399
int cappedEntityCount = Math.min(entityCount, maxJobThreads);
398400
jobExecutor =
399401
Executors.newFixedThreadPool(
400-
cappedEntityCount, Thread.ofPlatform().name(threadPrefix + "job-", 0).factory());
402+
cappedEntityCount,
403+
Thread.ofPlatform()
404+
.name(threadPrefix + "job-", 0)
405+
.priority(Thread.MIN_PRIORITY)
406+
.factory());
401407

402408
int finalNumConsumers = Math.min(threadConfig.numConsumers(), MAX_CONSUMER_THREADS);
403409
consumerExecutor =
404410
Executors.newFixedThreadPool(
405-
finalNumConsumers, Thread.ofPlatform().name(threadPrefix + "consumer-", 0).factory());
411+
finalNumConsumers,
412+
Thread.ofPlatform()
413+
.name(threadPrefix + "consumer-", 0)
414+
.priority(Thread.MIN_PRIORITY)
415+
.factory());
406416

407417
producerExecutor =
408418
Executors.newFixedThreadPool(
409419
threadConfig.numProducers(),
410-
Thread.ofPlatform().name(threadPrefix + "producer-", 0).factory());
420+
Thread.ofPlatform()
421+
.name(threadPrefix + "producer-", 0)
422+
.priority(Thread.MIN_PRIORITY)
423+
.factory());
411424

412425
return effectiveQueueSize;
413426
}
@@ -524,14 +537,18 @@ private void processTask(IndexingTask<?> task) {
524537
}
525538

526539
private Map<String, Object> createContextData(String entityType) {
527-
Map<String, Object> contextData = new HashMap<>();
528-
contextData.put(ENTITY_TYPE_KEY, entityType);
529-
contextData.put(RECREATE_INDEX, config.recreateIndex());
530-
contextData.put(RECREATE_CONTEXT, recreateContext);
531-
contextData.put(BulkSink.STATS_TRACKER_CONTEXT_KEY, getSinkTracker(entityType));
532-
getTargetIndexForEntity(entityType)
533-
.ifPresent(index -> contextData.put(TARGET_INDEX_KEY, index));
534-
return contextData;
540+
return contextDataCache.computeIfAbsent(
541+
entityType,
542+
type -> {
543+
Map<String, Object> contextData = new HashMap<>();
544+
contextData.put(ENTITY_TYPE_KEY, type);
545+
contextData.put(RECREATE_INDEX, config.recreateIndex());
546+
contextData.put(RECREATE_CONTEXT, recreateContext);
547+
contextData.put(BulkSink.STATS_TRACKER_CONTEXT_KEY, getSinkTracker(type));
548+
getTargetIndexForEntity(type)
549+
.ifPresent(index -> contextData.put(TARGET_INDEX_KEY, index));
550+
return contextData;
551+
});
535552
}
536553

537554
private StageStatsTracker getSinkTracker(String entityType) {

openmetadata-service/src/main/java/org/openmetadata/service/search/SearchSourceBuilderFactory.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,7 @@ default boolean isNonFuzzyField(String key) {
232232
Map<String, String> AGGREGATION_FIELD_REMAPS =
233233
Map.of(
234234
"owners.displayName.keyword", "ownerDisplayName",
235-
"owners.displayName", "ownerDisplayName",
236-
"owners.name.keyword", "ownerName",
237-
"owners.name", "ownerName");
235+
"owners.name.keyword", "ownerName");
238236

239237
/**
240238
* Root-level text fields that carry a {@code .keyword} sub-field in all index mappings. Only
@@ -281,4 +279,8 @@ static String resolveFieldForSortOrAggregation(String field) {
281279
}
282280
return field;
283281
}
282+
283+
static String remapAggregationField(String field) {
284+
return AGGREGATION_FIELD_REMAPS.getOrDefault(field, field);
285+
}
284286
}

openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchAggregationManager.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.openmetadata.service.search.AggregationManagementClient;
3636
import org.openmetadata.service.search.SearchAggregation;
3737
import org.openmetadata.service.search.SearchIndexUtils;
38-
import org.openmetadata.service.search.SearchSourceBuilderFactory;
3938
import org.openmetadata.service.search.SearchUtils;
4039
import org.openmetadata.service.search.elasticsearch.aggregations.ElasticAggregationsBuilder;
4140
import org.openmetadata.service.search.elasticsearch.queries.ElasticQueryBuilder;
@@ -122,8 +121,7 @@ public Response aggregate(AggregationRequest request) throws IOException {
122121
searchRequestBuilder.query(query);
123122
}
124123

125-
String aggregationField =
126-
SearchSourceBuilderFactory.resolveFieldForSortOrAggregation(request.getFieldName());
124+
String aggregationField = request.getFieldName();
127125
if (aggregationField == null || aggregationField.isBlank()) {
128126
throw new IllegalArgumentException("Aggregation field (fieldName) cannot be null or empty");
129127
}
@@ -262,8 +260,9 @@ public DataQualityReport genericAggregation(
262260
Optional.ofNullable(jsonResponse.getJsonObject("aggregations"));
263261
LOG.info(
264262
"Generic Aggregation - Aggregation results present: {}", aggregationResults.isPresent());
265-
aggregationResults.ifPresent(
266-
jsonObject -> LOG.info("Generic Aggregation - Aggregation results: {}", jsonObject));
263+
if (aggregationResults.isPresent()) {
264+
LOG.info("Generic Aggregation - Aggregation results: {}", aggregationResults.get());
265+
}
267266

268267
return SearchIndexUtils.parseAggregationResults(
269268
aggregationResults, aggregationMetadata.getAggregationMetadata());
@@ -313,7 +312,7 @@ public DataQualityReport genericAggregation(
313312
Query rbacQuery = ((ElasticQueryBuilder) rbacQueryBuilder).buildV2();
314313
if (parsedQuery != null) {
315314
final Query existingQuery = parsedQuery;
316-
parsedQuery =
315+
Query combinedQuery =
317316
Query.of(
318317
qb ->
319318
qb.bool(
@@ -322,6 +321,7 @@ public DataQualityReport genericAggregation(
322321
b.filter(rbacQuery);
323322
return b;
324323
}));
324+
parsedQuery = combinedQuery;
325325
} else {
326326
parsedQuery = rbacQuery;
327327
}
@@ -366,9 +366,10 @@ public DataQualityReport genericAggregation(
366366
LOG.info(
367367
"Generic Aggregation with RBAC - Aggregation results present: {}",
368368
aggregationResults.isPresent());
369-
aggregationResults.ifPresent(
370-
jsonObject ->
371-
LOG.info("Generic Aggregation with RBAC - Aggregation results: {}", jsonObject));
369+
if (aggregationResults.isPresent()) {
370+
LOG.info(
371+
"Generic Aggregation with RBAC - Aggregation results: {}", aggregationResults.get());
372+
}
372373

373374
return SearchIndexUtils.parseAggregationResults(
374375
aggregationResults, aggregationMetadata.getAggregationMetadata());

0 commit comments

Comments
 (0)