Skip to content

Commit b9feba6

Browse files
committed
MINOR: Fix usage duplicate query error bulk api
1 parent 614c50c commit b9feba6

1 file changed

Lines changed: 27 additions & 6 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100
import java.io.IOException;
101101
import java.io.StringWriter;
102102
import java.net.URI;
103+
import java.sql.SQLException;
103104
import java.time.Instant;
104105
import java.time.LocalDateTime;
105106
import java.time.LocalTime;
@@ -7478,12 +7479,22 @@ private BulkOperationResult bulkCreateOrUpdateEntitiesSequential(
74787479
.withRequest(entity.getFullyQualifiedName())
74797480
.withStatus(Status.OK.getStatusCode()));
74807481
} catch (Exception e) {
7481-
LOG.warn("Failed to process entity in bulk operation", e);
7482-
failedRequests.add(
7483-
new BulkResponse()
7484-
.withRequest(entity.getFullyQualifiedName())
7485-
.withStatus(Status.BAD_REQUEST.getStatusCode())
7486-
.withMessage(e.getMessage()));
7482+
if (isDuplicateKeyException(e)) {
7483+
LOG.debug(
7484+
"Entity already exists (duplicate key), treating as success: {}",
7485+
entity.getFullyQualifiedName());
7486+
successRequests.add(
7487+
new BulkResponse()
7488+
.withRequest(entity.getFullyQualifiedName())
7489+
.withStatus(Status.OK.getStatusCode()));
7490+
} else {
7491+
LOG.warn("Failed to process entity in bulk operation", e);
7492+
failedRequests.add(
7493+
new BulkResponse()
7494+
.withRequest(entity.getFullyQualifiedName())
7495+
.withStatus(Status.BAD_REQUEST.getStatusCode())
7496+
.withMessage(e.getMessage()));
7497+
}
74877498
}
74887499
}
74897500

@@ -7506,6 +7517,16 @@ private BulkOperationResult bulkCreateOrUpdateEntitiesSequential(
75067517
return result;
75077518
}
75087519

7520+
private static boolean isDuplicateKeyException(Exception e) {
7521+
Throwable cause = e.getCause();
7522+
if (cause instanceof SQLException sqlEx) {
7523+
// MySQL: error code 1062 = ER_DUP_ENTRY
7524+
// PostgreSQL: SQL state "23505" = unique_violation
7525+
return sqlEx.getErrorCode() == 1062 || "23505".equals(sqlEx.getSQLState());
7526+
}
7527+
return false;
7528+
}
7529+
75097530
public Optional<BulkOperationResult> getBulkJobStatus(String jobId) {
75107531
CompletableFuture<BulkOperationResult> job = BULK_JOBS.get(jobId);
75117532
if (job == null) {

0 commit comments

Comments
 (0)