-
Notifications
You must be signed in to change notification settings - Fork 10
Collections: one batch processing per task #786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1eabb00
7f5d86f
fa853c3
8e3d29d
bed1d1a
d02bac8
8b7556c
ca6c17e
baaeac2
47525e4
0720a17
2a2e268
eb38c73
0d7f4a1
fb4e874
290d5f0
9f95a76
cb97654
fd37d14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| """add batch tracking to collection_jobs | ||
|
|
||
| Revision ID: 058 | ||
| Revises: 057 | ||
| Create Date: 2026-04-13 | ||
|
|
||
| """ | ||
| from alembic import op | ||
| import sqlalchemy as sa | ||
|
|
||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "058" | ||
| down_revision = "057" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def upgrade(): | ||
| op.add_column( | ||
| "collection_jobs", | ||
| sa.Column( | ||
| "total_batches", | ||
| sa.Integer(), | ||
| nullable=True, | ||
| comment="Total number of batches the documents are split into", | ||
| ), | ||
| ) | ||
| op.add_column( | ||
| "collection_jobs", | ||
| sa.Column( | ||
| "current_batch_number", | ||
| sa.Integer(), | ||
| nullable=True, | ||
| comment="Which batch is currently being processed (1-indexed)", | ||
| ), | ||
| ) | ||
| op.add_column( | ||
| "collection_jobs", | ||
| sa.Column( | ||
| "documents_uploaded", | ||
| sa.JSON(), | ||
| nullable=True, | ||
| comment="List of document IDs successfully uploaded so far", | ||
| ), | ||
| ) | ||
| op.add_column( | ||
| "document", | ||
| sa.Column( | ||
| "openai_file_id", | ||
| sa.String(), | ||
| nullable=True, | ||
| comment="File ID assigned by the LLM provider (e.g. OpenAI file ID) to avoid re-uploading", | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| def downgrade(): | ||
| op.drop_column("collection_jobs", "total_batches") | ||
| op.drop_column("collection_jobs", "current_batch_number") | ||
| op.drop_column("collection_jobs", "documents_uploaded") | ||
| op.drop_column("document", "openai_file_id") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,11 @@ | ||
| import json | ||
| import logging | ||
| import functools as ft | ||
| from io import BytesIO | ||
| from typing import Iterable | ||
| import time | ||
|
|
||
| from openai import OpenAI, OpenAIError | ||
| from pydantic import BaseModel | ||
|
|
||
| from app.core.cloud import CloudStorage | ||
| from app.models import Document | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -78,11 +76,6 @@ def clean(self, resource): | |
|
|
||
| class VectorStoreCleaner(ResourceCleaner): | ||
| def clean(self, resource): | ||
| logger.info( | ||
| f"[VectorStoreCleaner.clean] Starting vector store cleanup | {{'vector_store_id': '{resource}'}}" | ||
| ) | ||
| for i in vs_ls(self.client, resource): | ||
| self.client.files.delete(i.id) | ||
| logger.info( | ||
| f"[VectorStoreCleaner.clean] Deleting vector store | {{'vector_store_id': '{resource}'}}" | ||
| ) | ||
|
|
@@ -118,36 +111,36 @@ def read(self, vector_store_id: str): | |
| def update( | ||
| self, | ||
| vector_store_id: str, | ||
| storage: CloudStorage, | ||
| documents: Iterable[Document], | ||
| ): | ||
| for docs in documents: | ||
| files = [] | ||
| for d in docs: | ||
| # Get file bytes and wrap in BytesIO for OpenAI API | ||
| content = storage.get(d.object_store_url) | ||
| f_obj = BytesIO(content) | ||
| f_obj.name = d.fname | ||
| files.append(f_obj) | ||
|
|
||
| logger.info( | ||
| f"[OpenAIVectorStoreCrud.update] Uploading files to vector store | {{'vector_store_id': '{vector_store_id}', 'file_count': {len(files)}}}" | ||
| ) | ||
| req = self.client.vector_stores.file_batches.upload_and_poll( | ||
| docs: list[Document], | ||
| ) -> None: | ||
| if not docs: | ||
| return | ||
|
|
||
| try: | ||
| batch = self.client.vector_stores.file_batches.upload_and_poll( | ||
| vector_store_id=vector_store_id, | ||
| files=files, | ||
| files=[], | ||
| file_ids=[doc.openai_file_id for doc in docs], | ||
|
Comment on lines
+116
to
+123
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validate This method now assumes every Suggested patch ) -> None:
if not docs:
return
+
+ missing_file_ids = [str(doc.id) for doc in docs if not doc.openai_file_id]
+ if missing_file_ids:
+ raise ValueError(
+ "All documents must have openai_file_id before vector store attach: "
+ + ", ".join(missing_file_ids)
+ )
try:
batch = self.client.vector_stores.file_batches.upload_and_poll(
vector_store_id=vector_store_id,
files=[],🤖 Prompt for AI Agents |
||
| ) | ||
| logger.info( | ||
| f"[OpenAIVectorStoreCrud.update] File upload completed | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}" | ||
| except OpenAIError as err: | ||
| logger.error( | ||
| f"[OpenAIVectorStoreCrud.update] Batch attach failed | " | ||
| f"{{'vector_store_id': '{vector_store_id}', 'error': '{str(err)}'}}", | ||
| exc_info=True, | ||
| ) | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| if req.file_counts.completed != req.file_counts.total: | ||
| error_msg = f"OpenAI document processing error: {req.file_counts.completed}/{req.file_counts.total} files completed" | ||
| logger.error( | ||
| f"[OpenAIVectorStoreCrud.update] Document processing error | {{'vector_store_id': '{vector_store_id}', 'completed_files': {req.file_counts.completed}, 'total_files': {req.file_counts.total}}}" | ||
| ) | ||
| raise InterruptedError(error_msg) | ||
| raise | ||
|
|
||
| yield from docs | ||
| logger.info( | ||
| f"[OpenAIVectorStoreCrud.update] Batch complete | " | ||
| f"{{'vector_store_id': '{vector_store_id}', " | ||
| f"'completed': {batch.file_counts.completed}, 'failed': {batch.file_counts.failed}}}" | ||
| ) | ||
| if batch.file_counts.failed > 0: | ||
| raise RuntimeError( | ||
| f"Batch attach to vector store {vector_store_id!r} completed with " | ||
| f"{batch.file_counts.failed} failed file(s) " | ||
| f"({batch.file_counts.completed} succeeded)" | ||
| ) | ||
|
|
||
| def delete(self, vector_store_id: str, retries: int = 3): | ||
| if retries < 1: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.