Skip to content

Commit 426081b

Browse files
pmbrullclaude
andauthored
Add hybrid-rrf pipeline check and improve embedding error reporting (#26936)
* Add hybrid-rrf pipeline check and improve embedding error reporting in status validation - Add isHybridSearchPipelineAvailable() to OpenSearchVectorService and SearchRepository to verify the hybrid-rrf search pipeline exists in OpenSearch - Update getEmbeddingsValidation in SystemRepository to also validate the pipeline when embeddings are enabled, failing with actionable message if missing - Retry vector service initialization during validation to surface the actual failure reason (embedding client vs OpenSearch vector service) instead of the generic "not configured properly" message Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Address PR review feedback from Copilot and Gitar - Fix bug: retry path now validates embedding generation before checking the hybrid pipeline (gitar) - Return Optional<String> from checkHybridSearchPipeline instead of boolean, differentiating 404 (missing) from 5xx/connectivity errors with specific messages (copilot) - Include initialization exception message in StepValidation so operators don't need to hunt logs (copilot) - Return Optional.empty() for non-OpenSearch implementations since hybrid pipeline is OpenSearch-specific (gitar) - Add test for 5xx response case in pipeline check Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Store vector service init error for status reporting initializeVectorSearchService() catches exceptions internally and never rethrows, so the try-catch in retryInitAndReportError never captured the error. Store the exception message in vectorServiceInitError field and read it in the validation to include in the health check message. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Add unit tests for embeddings validation paths in SystemRepository Cover the new validation branches: pipeline available, pipeline missing (404), pipeline 5xx, embedding client init failure with error message, vector service init failure with embedding client OK, retry with no recovery, and Elasticsearch not supported. Make getEmbeddingsValidation package-private for testability. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Clear vectorServiceInitError on successful initialization Prevents stale error state after a successful retry. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Make vectorServiceInitError volatile and skip redundant init retries - Add volatile to vectorServiceInitError for thread-safe reads from the unsynchronized getter - Skip calling initializeVectorSearchService() during validation when a previous init error is already recorded, avoiding expensive retries and log spam on every /system/validate call Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Address review: @VisibleForTesting, e.toString(), mock Bedrock config - Annotate getEmbeddingsValidation with @VisibleForTesting to document the package-private visibility is intentional for testing - Use e.toString() instead of e.getMessage() for vectorServiceInitError and pipeline check errors to avoid null messages - Mock Bedrock config in tests instead of null to eliminate noisy ERROR logs during test runs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Allow transient failure recovery, include response body, fix test style - Remove guard that prevented init retries after a previous error, allowing recovery from transient failures (e.g., OpenSearch restart). initializeVectorSearchService() is already idempotent. - Include truncated response body in non-404 pipeline check errors for actionable diagnostics (e.g., 403 permission details) - Rename testEmbeddingsPassButPipelineMissing to testEmbeddingsFailWhenPipelineMissing for clarity - Use imported Optional instead of fully qualified in tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ca170ff commit 426081b

5 files changed

Lines changed: 374 additions & 7 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SystemRepository.java

Lines changed: 96 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static org.openmetadata.service.apps.bundles.insights.DataInsightsApp.getDataStreamName;
99

1010
import com.fasterxml.jackson.core.type.TypeReference;
11+
import com.google.common.annotations.VisibleForTesting;
1112
import com.unboundid.ldap.sdk.LDAPConnection;
1213
import com.unboundid.ldap.sdk.LDAPConnectionOptions;
1314
import com.unboundid.ldap.sdk.SearchResult;
@@ -560,7 +561,8 @@ public ValidationResponse validateSystem(
560561
public void addExtraValidations(
561562
OpenMetadataApplicationConfig applicationConfig, ValidationResponse validation) {}
562563

563-
private StepValidation getEmbeddingsValidation(OpenMetadataApplicationConfig applicationConfig) {
564+
@VisibleForTesting
565+
StepValidation getEmbeddingsValidation(OpenMetadataApplicationConfig applicationConfig) {
564566
StepValidation embeddingsValidation = new StepValidation();
565567
String description = "Embeddings are used to allow Semantic Search";
566568
SearchRepository searchRepository = Entity.getSearchRepository();
@@ -576,15 +578,24 @@ private StepValidation getEmbeddingsValidation(OpenMetadataApplicationConfig app
576578
String configMessage = getEmbeddingConfigurationMessage(applicationConfig);
577579

578580
if (searchRepository.getVectorIndexService() == null) {
579-
return embeddingsValidation
580-
.withDescription(description)
581-
.withMessage("Embeddings are not configured properly. " + configMessage)
582-
.withPassed(false);
581+
return retryInitAndReportError(
582+
searchRepository, embeddingsValidation, description, configMessage);
583583
}
584584

585585
try {
586-
return validateEmbeddingGeneration(
587-
searchRepository.getEmbeddingClient(), embeddingsValidation, description, configMessage);
586+
StepValidation embeddingResult =
587+
validateEmbeddingGeneration(
588+
searchRepository.getEmbeddingClient(),
589+
embeddingsValidation,
590+
description,
591+
configMessage);
592+
593+
if (Boolean.FALSE.equals(embeddingResult.getPassed())) {
594+
return embeddingResult;
595+
}
596+
597+
return validateHybridSearchPipeline(
598+
searchRepository, embeddingResult, description, configMessage);
588599
} catch (Exception e) {
589600
LOG.error("Error during embedding generation validation", e);
590601
return embeddingsValidation
@@ -594,6 +605,84 @@ private StepValidation getEmbeddingsValidation(OpenMetadataApplicationConfig app
594605
}
595606
}
596607

608+
private StepValidation retryInitAndReportError(
609+
SearchRepository searchRepository,
610+
StepValidation embeddingsValidation,
611+
String description,
612+
String configMessage) {
613+
searchRepository.initializeVectorSearchService();
614+
615+
if (searchRepository.getVectorIndexService() != null) {
616+
try {
617+
StepValidation embeddingResult =
618+
validateEmbeddingGeneration(
619+
searchRepository.getEmbeddingClient(),
620+
embeddingsValidation,
621+
description,
622+
configMessage);
623+
if (Boolean.FALSE.equals(embeddingResult.getPassed())) {
624+
return embeddingResult;
625+
}
626+
return validateHybridSearchPipeline(
627+
searchRepository, embeddingResult, description, configMessage);
628+
} catch (Exception e) {
629+
LOG.error("Error during embedding generation validation after retry", e);
630+
return embeddingsValidation
631+
.withDescription(description)
632+
.withMessage("Embedding generation failed: " + e.getMessage() + ". " + configMessage)
633+
.withPassed(false);
634+
}
635+
}
636+
637+
String initError = searchRepository.getVectorServiceInitError();
638+
String errorSuffix = initError != null ? " Error: " + initError + "." : "";
639+
if (searchRepository.getEmbeddingClient() == null) {
640+
return embeddingsValidation
641+
.withDescription(description)
642+
.withMessage(
643+
"Embedding client could not be initialized."
644+
+ errorSuffix
645+
+ " Check the embedding provider configuration. "
646+
+ configMessage)
647+
.withPassed(false);
648+
}
649+
650+
return embeddingsValidation
651+
.withDescription(description)
652+
.withMessage(
653+
"Vector search service could not be initialized. "
654+
+ "The embedding client is configured but the OpenSearch vector service failed to start."
655+
+ errorSuffix
656+
+ " "
657+
+ configMessage)
658+
.withPassed(false);
659+
}
660+
661+
private StepValidation validateHybridSearchPipeline(
662+
SearchRepository searchRepository,
663+
StepValidation embeddingsValidation,
664+
String description,
665+
String configMessage) {
666+
Optional<String> pipelineError = searchRepository.checkHybridSearchPipeline();
667+
if (pipelineError.isPresent()) {
668+
return embeddingsValidation
669+
.withDescription(description)
670+
.withMessage(
671+
"Embeddings are working but the hybrid search pipeline check failed: "
672+
+ pipelineError.get()
673+
+ " "
674+
+ configMessage)
675+
.withPassed(false);
676+
}
677+
678+
return embeddingsValidation
679+
.withDescription(description)
680+
.withMessage(
681+
String.format(
682+
"Embeddings and hybrid search pipeline are working correctly. %s", configMessage))
683+
.withPassed(true);
684+
}
685+
597686
private StepValidation validateEmbeddingGeneration(
598687
EmbeddingClient embeddingClient,
599688
StepValidation embeddingsValidation,

openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import java.util.List;
7474
import java.util.Map;
7575
import java.util.Objects;
76+
import java.util.Optional;
7677
import java.util.Set;
7778
import java.util.TreeSet;
7879
import java.util.UUID;
@@ -210,6 +211,7 @@ public class SearchRepository {
210211
@Getter private EmbeddingClient embeddingClient;
211212
@Getter private VectorIndexService vectorIndexService;
212213
@Getter private VectorEmbeddingHandler vectorEmbeddingHandler;
214+
@Getter private volatile String vectorServiceInitError;
213215
private volatile boolean vectorServiceInitialized = false;
214216

215217
public SearchRepository(ElasticSearchConfiguration config, int maxDBConnections) {
@@ -422,6 +424,7 @@ public synchronized void initializeVectorSearchService() {
422424
this.vectorEmbeddingHandler = new VectorEmbeddingHandler(vectorIndexService);
423425

424426
vectorServiceInitialized = true;
427+
this.vectorServiceInitError = null;
425428

426429
ensureHybridSearchPipeline();
427430

@@ -430,6 +433,7 @@ public synchronized void initializeVectorSearchService() {
430433
cfg.getNaturalLanguageSearch().getEmbeddingProvider(),
431434
embeddingClient.getDimension());
432435
} catch (Exception e) {
436+
this.vectorServiceInitError = e.toString();
433437
LOG.error("Failed to initialize vector search service: {}", e.getMessage(), e);
434438
}
435439
}
@@ -471,6 +475,13 @@ public void updateHybridSearchPipeline(double keywordWeight, double semanticWeig
471475
}
472476
}
473477

478+
public Optional<String> checkHybridSearchPipeline() {
479+
if (vectorIndexService instanceof OpenSearchVectorService openSearchVectorService) {
480+
return openSearchVectorService.checkHybridSearchPipeline();
481+
}
482+
return Optional.empty();
483+
}
484+
474485
public IndexMapping getIndexMapping(String entityType) {
475486
return entityIndexMap.get(entityType);
476487
}

openmetadata-service/src/main/java/org/openmetadata/service/search/vector/OpenSearchVectorService.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.LinkedHashMap;
1111
import java.util.List;
1212
import java.util.Map;
13+
import java.util.Optional;
1314
import lombok.Getter;
1415
import lombok.extern.slf4j.Slf4j;
1516
import org.openmetadata.schema.EntityInterface;
@@ -105,6 +106,52 @@ public void ensureHybridSearchPipeline(double keywordWeight, double semanticWeig
105106
semanticWeight);
106107
}
107108

109+
public Optional<String> checkHybridSearchPipeline() {
110+
try {
111+
OpenSearchGenericClient genericClient = client.generic();
112+
var request =
113+
Requests.builder()
114+
.endpoint("/_search/pipeline/" + HYBRID_PIPELINE_NAME)
115+
.method("GET")
116+
.build();
117+
try (var response = genericClient.execute(request)) {
118+
int status = response.getStatus();
119+
if (status < 400) {
120+
return Optional.empty();
121+
}
122+
if (status == 404) {
123+
return Optional.of(
124+
"Hybrid search pipeline '"
125+
+ HYBRID_PIPELINE_NAME
126+
+ "' not found. Run a reindex to create it.");
127+
}
128+
String detail =
129+
response
130+
.getBody()
131+
.map(
132+
b -> {
133+
try {
134+
String body = new String(b.bodyAsBytes(), StandardCharsets.UTF_8);
135+
return body.length() > 200 ? body.substring(0, 200) : body;
136+
} catch (Exception ignored) {
137+
return "";
138+
}
139+
})
140+
.orElse("");
141+
return Optional.of(
142+
"Unexpected status "
143+
+ status
144+
+ " when checking hybrid search pipeline '"
145+
+ HYBRID_PIPELINE_NAME
146+
+ "'."
147+
+ (detail.isEmpty() ? "" : " Response: " + detail));
148+
}
149+
} catch (Exception e) {
150+
LOG.error("Failed to check hybrid search pipeline '{}'", HYBRID_PIPELINE_NAME, e);
151+
return Optional.of("Failed to check hybrid search pipeline: " + e.toString());
152+
}
153+
}
154+
108155
@Override
109156
public Map<String, Object> generateEmbeddingFields(EntityInterface entity) {
110157
return VectorDocBuilder.buildEmbeddingFields(entity, embeddingClient);

0 commit comments

Comments
 (0)