Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""add batch tracking to collection_jobs

Revision ID: 058
Revises: 057
Create Date: 2026-04-13

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "058"
down_revision = "057"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"collection_jobs",
sa.Column(
"total_batches",
sa.Integer(),
nullable=True,
comment="Total number of batches the documents are split into",
),
)
op.add_column(
"collection_jobs",
sa.Column(
"current_batch_number",
sa.Integer(),
nullable=True,
comment="Which batch is currently being processed (1-indexed)",
),
)
op.add_column(
"collection_jobs",
sa.Column(
"documents_uploaded",
sa.JSON(),
nullable=True,
comment="List of document IDs successfully uploaded so far",
),
)
op.add_column(
"document",
sa.Column(
"openai_file_id",
sa.String(),
nullable=True,
comment="File ID assigned by the LLM provider (e.g. OpenAI file ID) to avoid re-uploading",
),
)


def downgrade():
op.drop_column("collection_jobs", "total_batches")
op.drop_column("collection_jobs", "current_batch_number")
op.drop_column("collection_jobs", "documents_uploaded")
op.drop_column("document", "openai_file_id")
2 changes: 1 addition & 1 deletion backend/app/api/docs/documents/upload.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Upload a document to Kaapi.

- If only a file is provided, the document will be uploaded and stored, and its ID will be returned.
- If only a file is provided, the document will be uploaded and stored, and its ID will be returned. The maximum file size allowed for upload is 25 MB.
- If a target format is specified, a transformation job will also be created to transform document into target format in the background. The response will include both the uploaded document details and information about the transformation job.
- If a callback URL is provided, you will receive a notification at that URL once the document transformation job is completed.

Expand Down
30 changes: 24 additions & 6 deletions backend/app/celery/tasks/job_execution.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import logging
from typing import Any

import celery
from asgi_correlation_id import correlation_id
from celery import current_task
from opentelemetry import context as otel_context
Expand Down Expand Up @@ -133,16 +131,36 @@ def run_doctransform_job(self, project_id: int, job_id: str, trace_id: str, **kw


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_create_collection_job")
def run_create_collection_job(
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_collection_setup_job")
def run_collection_setup_job(
self, project_id: int, job_id: str, trace_id: str, **kwargs
):
from app.services.collections.create_collection import execute_job
from app.services.collections.create_collection import execute_setup_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_job(
lambda: execute_setup_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
task_instance=self,
**kwargs,
),
)


@celery_app.task(bind=True, queue="low_priority", priority=1)
@gevent_timeout(settings.CELERY_TASK_SOFT_TIME_LIMIT, "run_collection_batch_job")
def run_collection_batch_job(
self, project_id: int, job_id: str, trace_id: str, **kwargs
):
from app.services.collections.create_collection import execute_batch_job

_set_trace(trace_id)
return _run_with_otel_parent(
self,
lambda: execute_batch_job(
project_id=project_id,
job_id=job_id,
task_id=current_task.request.id,
Expand Down
26 changes: 22 additions & 4 deletions backend/app/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,38 @@ def start_doctransform_job(
return task_id


def start_create_collection_job(
def start_collection_setup_job(
project_id: int, job_id: str, trace_id: str = "N/A", **kwargs
) -> str:
from app.celery.tasks.job_execution import run_create_collection_job
from app.celery.tasks.job_execution import run_collection_setup_job

task_id = _enqueue_with_trace_context(
run_create_collection_job,
run_collection_setup_job,
project_id=project_id,
job_id=job_id,
trace_id=trace_id,
**kwargs,
)
logger.info(
f"[start_create_collection_job] Started job {job_id} with Celery task {task_id}"
f"[start_collection_setup_job] Started job {job_id} with Celery task {task_id}"
)
return task_id


def start_collection_batch_job(
project_id: int, job_id: str, trace_id: str = "N/A", **kwargs
) -> str:
from app.celery.tasks.job_execution import run_collection_batch_job

task_id = _enqueue_with_trace_context(
run_collection_batch_job,
project_id=project_id,
job_id=job_id,
trace_id=trace_id,
**kwargs,
)
logger.info(
f"[start_collection_batch_job] Started job {job_id} with Celery task {task_id}"
)
Comment thread
nishika26 marked this conversation as resolved.
return task_id

Expand Down
61 changes: 27 additions & 34 deletions backend/app/crud/rag/open_ai.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import json
import logging
import functools as ft
from io import BytesIO
from typing import Iterable
import time

from openai import OpenAI, OpenAIError
from pydantic import BaseModel

from app.core.cloud import CloudStorage
from app.models import Document

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,11 +76,6 @@ def clean(self, resource):

class VectorStoreCleaner(ResourceCleaner):
def clean(self, resource):
logger.info(
f"[VectorStoreCleaner.clean] Starting vector store cleanup | {{'vector_store_id': '{resource}'}}"
)
for i in vs_ls(self.client, resource):
self.client.files.delete(i.id)
logger.info(
f"[VectorStoreCleaner.clean] Deleting vector store | {{'vector_store_id': '{resource}'}}"
)
Expand Down Expand Up @@ -118,36 +111,36 @@ def read(self, vector_store_id: str):
def update(
self,
vector_store_id: str,
storage: CloudStorage,
documents: Iterable[Document],
):
for docs in documents:
files = []
for d in docs:
# Get file bytes and wrap in BytesIO for OpenAI API
content = storage.get(d.object_store_url)
f_obj = BytesIO(content)
f_obj.name = d.fname
files.append(f_obj)

logger.info(
f"[OpenAIVectorStoreCrud.update] Uploading files to vector store | {{'vector_store_id': '{vector_store_id}', 'file_count': {len(files)}}}"
)
req = self.client.vector_stores.file_batches.upload_and_poll(
docs: list[Document],
) -> None:
if not docs:
return

try:
batch = self.client.vector_stores.file_batches.upload_and_poll(
vector_store_id=vector_store_id,
files=files,
files=[],
file_ids=[doc.openai_file_id for doc in docs],
Comment on lines +116 to +123
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Validate openai_file_id before calling the batch attach API.

This method now assumes every Document has already been uploaded, but Line 123 still forwards None values straight into file_ids. That turns a local contract violation into a provider-side failure with much worse debugging context.

Suggested patch
     ) -> None:
         if not docs:
             return
+
+        missing_file_ids = [str(doc.id) for doc in docs if not doc.openai_file_id]
+        if missing_file_ids:
+            raise ValueError(
+                "All documents must have openai_file_id before vector store attach: "
+                + ", ".join(missing_file_ids)
+            )
 
         try:
             batch = self.client.vector_stores.file_batches.upload_and_poll(
                 vector_store_id=vector_store_id,
                 files=[],
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/crud/rag/open_ai.py` around lines 116 - 123, The code calls
self.client.vector_stores.file_batches.upload_and_poll with file_ids built from
[doc.openai_file_id for doc in docs] without validating openai_file_id; change
the logic in the method that constructs batch/upload (the block using docs,
openai_file_id, vector_store_id, and upload_and_poll) to filter out any docs
where doc.openai_file_id is falsy before building file_ids, and if any docs were
dropped either log a warning (including identifiers like doc.id or index) or
raise a clear error; after filtering, if the resulting file_ids list is empty,
return early instead of calling upload_and_poll.

)
logger.info(
f"[OpenAIVectorStoreCrud.update] File upload completed | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}"
except OpenAIError as err:
logger.error(
f"[OpenAIVectorStoreCrud.update] Batch attach failed | "
f"{{'vector_store_id': '{vector_store_id}', 'error': '{str(err)}'}}",
exc_info=True,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if req.file_counts.completed != req.file_counts.total:
error_msg = f"OpenAI document processing error: {req.file_counts.completed}/{req.file_counts.total} files completed"
logger.error(
f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}"
)
raise InterruptedError(error_msg)
raise

yield from docs
logger.info(
f"[OpenAIVectorStoreCrud.update] Batch complete | "
f"{{'vector_store_id': '{vector_store_id}', "
f"'completed': {batch.file_counts.completed}, 'failed': {batch.file_counts.failed}}}"
)
if batch.file_counts.failed > 0:
raise RuntimeError(
f"Batch attach to vector store {vector_store_id!r} completed with "
f"{batch.file_counts.failed} failed file(s) "
f"({batch.file_counts.completed} succeeded)"
)

def delete(self, vector_store_id: str, retries: int = 3):
if retries < 1:
Expand Down
27 changes: 26 additions & 1 deletion backend/app/models/collection_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,29 @@ class CollectionJob(SQLModel, table=True):
documents: list[str] | None = Field(
default=None,
sa_column=Column(
JSON, nullable=True, comment="List of documents given to make collection"
JSON, nullable=True, comment="List of document IDs given to make collection"
),
)
total_batches: int | None = Field(
default=None,
nullable=True,
sa_column_kwargs={
"comment": "Total number of batches the documents are split into"
},
)
current_batch_number: int | None = Field(
default=None,
nullable=True,
sa_column_kwargs={
"comment": "Which batch is currently being processed (1-indexed)"
},
)
documents_uploaded: list[str] | None = Field(
default=None,
sa_column=Column(
JSON,
nullable=True,
comment="List of document IDs successfully uploaded so far",
),
)

Expand Down Expand Up @@ -139,6 +161,9 @@ class CollectionJobUpdate(SQLModel):
collection_id: UUID | None = None
total_size_mb: float | None = None
trace_id: str | None = None
total_batches: int | None = None
current_batch_number: int | None = None
documents_uploaded: list[str] | None = None


##Response models
Expand Down
5 changes: 5 additions & 0 deletions backend/app/models/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class Document(DocumentBase, table=True):
description="The size of the document in kilobytes",
sa_column_kwargs={"comment": "Size of the document in kilobytes (KB)"},
)
openai_file_id: str | None = Field(
default=None,
nullable=True,
sa_column_kwargs={"comment": "File ID assigned by OpenAI (avoid re-uploading)"},
)

# Foreign keys
source_document_id: UUID | None = Field(
Expand Down
Loading
Loading