Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 135 additions & 32 deletions src/sqlite/cloudsync_sqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -1144,14 +1144,28 @@ typedef struct {
int value_header_len;
const char *value_data;
int64_t value_data_len;
// Positional-cursor outputs: the resume point AFTER the chunk currently held.
// These live in the per-scan reset region (after eof) so xFilter's bulk memset
// clears them. next_* is the (db_version, seq, frag_offset) a follow-up call
// passes back as resume_* to continue exactly where this chunk stopped.
int64_t next_dbv;
int64_t next_seq;
int64_t next_frag_offset;
bool is_final;
} cloudsync_payload_chunks_cursor;

static int payload_chunks_connect(sqlite3 *db, void *aux, int argc, const char *const *argv, sqlite3_vtab **vtab, char **err) {
UNUSED_PARAMETER(argc); UNUSED_PARAMETER(argv); UNUSED_PARAMETER(err);
int rc = sqlite3_declare_vtab(db,
"CREATE TABLE x(payload BLOB, chunk_index INTEGER, payload_size INTEGER, rows INTEGER, "
"db_version_min INTEGER, db_version_max INTEGER, watermark_db_version INTEGER, "
"since_db_version HIDDEN, site_id HIDDEN, until_db_version HIDDEN, exclude_filter_site_id HIDDEN)");
"since_db_version HIDDEN, site_id HIDDEN, until_db_version HIDDEN, exclude_filter_site_id HIDDEN, "
// Positional-cursor outputs (cols 11..14): the resume point after the
// emitted chunk, plus a final-chunk flag. A stateless /check passes these
// back as the resume_* inputs (cols 15..17) to continue the drain without
// a spool table — O(1) seek per chunk instead of replaying from since.
"next_db_version INTEGER, next_seq INTEGER, next_frag_offset INTEGER, is_final INTEGER, "
"resume_db_version HIDDEN, resume_seq HIDDEN, resume_frag_offset HIDDEN)");
if (rc != SQLITE_OK) return rc;
cloudsync_payload_chunks_vtab *p = sqlite3_malloc64(sizeof(*p));
if (!p) return SQLITE_NOMEM;
Expand Down Expand Up @@ -1185,19 +1199,23 @@ static int payload_chunks_close(sqlite3_vtab_cursor *cursor) {

static int payload_chunks_best_index(sqlite3_vtab *vtab, sqlite3_index_info *idxinfo) {
UNUSED_PARAMETER(vtab);
// Assign argvIndex in a canonical hidden-column order so xFilter can read argv
// in a fixed order regardless of how SQLite presents constraints. idxNum bit k
// is set when handled_cols[k] is bound; xFilter reads argv in this same order.
// bit0=since_db_version(7) bit1=site_id(8) bit2=until_db_version(9)
// bit3=exclude_filter_site_id(10) bit4=resume_db_version(15)
// bit5=resume_seq(16) bit6=resume_frag_offset(17)
static const int handled_cols[] = {7, 8, 9, 10, 15, 16, 17};
int argv_index = 1;
int idxnum = 0;
// Assign argvIndex in canonical hidden-column order (7..10) so xFilter can
// read argv in a fixed order regardless of how SQLite presents constraints.
// Hidden columns: 7=since_db_version, 8=site_id, 9=until_db_version,
// 10=exclude_filter_site_id.
for (int col = 7; col <= 10; ++col) {
for (size_t k = 0; k < sizeof(handled_cols) / sizeof(handled_cols[0]); ++k) {
int col = handled_cols[k];
for (int i = 0; i < idxinfo->nConstraint; ++i) {
struct sqlite3_index_constraint *cn = &idxinfo->aConstraint[i];
if (!cn->usable || cn->op != SQLITE_INDEX_CONSTRAINT_EQ || cn->iColumn != col) continue;
idxinfo->aConstraintUsage[i].argvIndex = argv_index++;
idxinfo->aConstraintUsage[i].omit = 1;
idxnum |= (1 << (col - 7));
idxnum |= (1 << k);
break; // at most one constraint consumed per hidden column
}
}
Expand Down Expand Up @@ -1249,6 +1267,33 @@ static int payload_chunks_plan_fragment(cloudsync_payload_chunks_cursor *c) {
return SQLITE_OK;
}

// Set up fragment state for the current source row (a single value larger than
// max_chunk_size) so emit_fragment can stream it. start_offset is the byte offset
// within the encoded value to resume from (0 when first reaching the value;
// >0 when a positional cursor resumes mid-value). frag_part is derived from the
// offset so the fragment's part index is consistent whether reached by streaming
// or by a seek. The plan (frag_target/frag_count) is a deterministic function of
// the row, so a resumed fragment tiles identically to a streamed one.
static int payload_chunks_begin_fragment(cloudsync_payload_chunks_cursor *c, int64_t start_offset) {
dbvalue_t *col_value = (dbvalue_t *)sqlite3_column_value(c->src, 3);
int type = database_value_type(col_value);
if (type != DBTYPE_TEXT && type != DBTYPE_BLOB) return SQLITE_TOOBIG;
int64_t raw_len = 0;
int header_len = cloudsync_payload_encoded_value_header(col_value, c->value_header, sizeof(c->value_header), &raw_len);
if (header_len <= 0) return SQLITE_ERROR;
c->value_header_len = header_len;
c->value_data = (const char *)database_value_blob(col_value);
c->value_data_len = raw_len;
c->frag_total = header_len + raw_len;
c->frag_offset = start_offset;
int rc = payload_chunks_plan_fragment(c);
if (rc != SQLITE_OK) return rc;
c->frag_part = (c->frag_target > 0) ? (int)(start_offset / c->frag_target) : 0;
c->frag_checksum = cloudsync_payload_encoded_value_checksum(col_value);
c->frag_active = true;
return SQLITE_OK;
}

static int payload_chunks_emit_fragment(cloudsync_payload_chunks_cursor *c) {
cloudsync_context *data = c->vtab->data;
if (c->payload) { cloudsync_memory_free(c->payload); c->payload = NULL; }
Expand Down Expand Up @@ -1321,23 +1366,9 @@ static int payload_chunks_build_next(cloudsync_payload_chunks_cursor *c) {

if ((int64_t)row_size + (int64_t)payload_header_size + CLOUDSYNC_PAYLOAD_CHUNK_SAFETY_MARGIN > max_size) {
if (cloudsync_payload_context_nrows(payload) > 0) break;
dbvalue_t *col_value = (dbvalue_t *)rowv[3];
int type = database_value_type(col_value);
if (type != DBTYPE_TEXT && type != DBTYPE_BLOB) { cloudsync_memory_free(payload); return SQLITE_TOOBIG; }
int64_t raw_len = 0;
int header_len = cloudsync_payload_encoded_value_header(col_value, c->value_header, sizeof(c->value_header), &raw_len);
if (header_len <= 0) { cloudsync_memory_free(payload); return SQLITE_ERROR; }
c->value_header_len = header_len;
c->value_data = (const char *)database_value_blob(col_value);
c->value_data_len = raw_len;
c->frag_total = header_len + raw_len;
c->frag_offset = 0;
c->frag_part = 0;
rc = payload_chunks_plan_fragment(c);
if (rc != SQLITE_OK) { cloudsync_memory_free(payload); return rc; }
c->frag_checksum = cloudsync_payload_encoded_value_checksum(col_value);
c->frag_active = true;
cloudsync_memory_free(payload);
rc = payload_chunks_begin_fragment(c, 0);
if (rc != SQLITE_OK) return rc;
return payload_chunks_emit_fragment(c);
}

Expand All @@ -1360,6 +1391,38 @@ static int payload_chunks_build_next(cloudsync_payload_chunks_cursor *c) {
return SQLITE_OK;
}

// Record the resume point a stateless caller passes back to continue after the
// chunk just built. Reads the source statement, which is positioned at the next
// unconsumed row (or the same row when a value is still mid-fragment). Must be
// called only after build_next produced a chunk (i.e. !eof).
static void payload_chunks_set_next_cursor(cloudsync_payload_chunks_cursor *c) {
if (c->frag_active) {
// Mid-value: resume the same row at the next byte offset.
c->next_dbv = sqlite3_column_int64(c->src, 5);
c->next_seq = sqlite3_column_int64(c->src, 8);
c->next_frag_offset = c->frag_offset;
c->is_final = false;
} else if (c->has_row) {
// Row boundary: the next chunk starts at the current (unconsumed) row.
c->next_dbv = sqlite3_column_int64(c->src, 5);
c->next_seq = sqlite3_column_int64(c->src, 8);
c->next_frag_offset = 0;
c->is_final = false;
} else {
// Stream exhausted: this was the last chunk of the window.
c->next_dbv = c->watermark;
c->next_seq = 0;
c->next_frag_offset = 0;
c->is_final = true;
}
}

static int payload_chunks_advance(cloudsync_payload_chunks_cursor *c) {
int rc = payload_chunks_build_next(c);
if (rc == SQLITE_OK && !c->eof) payload_chunks_set_next_cursor(c);
return rc;
}

static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const char *idxstr, int argc, sqlite3_value **argv) {
UNUSED_PARAMETER(idxstr); UNUSED_PARAMETER(argc);
cloudsync_payload_chunks_cursor *c = (cloudsync_payload_chunks_cursor *)cursor;
Expand All @@ -1378,6 +1441,13 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const
bool site_id_given = false;
int64_t until = 0;
bool exclude = false;
// Positional resume cursor (cols 15..17): when resume_db_version is bound the
// scan starts at (resume_db_version, resume_seq) inclusive and the first chunk
// resumes a mid-value fragment at resume_frag_offset, instead of replaying the
// whole window from `since`. Lets a stateless /check page the stream with an
// O(1) seek per call and no spool table.
bool positional = false;
int64_t resume_dbv = 0, resume_seq = 0, resume_frag = 0;
if (idxnum & 1) since = sqlite3_value_int64(argv[argi++]);
if (idxnum & 2) {
if (sqlite3_value_type(argv[argi]) != SQLITE_NULL) {
Expand All @@ -1389,6 +1459,9 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const
}
if (idxnum & 4) until = sqlite3_value_int64(argv[argi++]);
if (idxnum & 8) exclude = (sqlite3_value_int(argv[argi++]) != 0);
if (idxnum & 16) { resume_dbv = sqlite3_value_int64(argv[argi++]); positional = true; }
if (idxnum & 32) resume_seq = sqlite3_value_int64(argv[argi++]);
if (idxnum & 64) resume_frag = sqlite3_value_int64(argv[argi++]);

// Resolve the site filter:
// exclude=true -> all sites except filter_site_id (CHECK path); site required
Expand Down Expand Up @@ -1421,24 +1494,50 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const
}
c->watermark = until;

char *sql = sqlite3_mprintf(
"SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq "
"FROM cloudsync_changes WHERE db_version>? AND site_id%s? AND db_version<=? ORDER BY db_version, seq ASC",
site_op);
// Window upper bound is always `until`. The lower bound is either the legacy
// exclusive `since` (db_version > since) or the inclusive positional cursor
// (db_version, seq) >= (resume_dbv, resume_seq).
char *sql;
if (positional) {
sql = sqlite3_mprintf(
"SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq "
"FROM cloudsync_changes WHERE db_version<=? AND site_id%s? AND "
"(db_version>? OR (db_version=? AND seq>=?)) ORDER BY db_version, seq ASC",
site_op);
} else {
sql = sqlite3_mprintf(
"SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq "
"FROM cloudsync_changes WHERE db_version>? AND site_id%s? AND db_version<=? ORDER BY db_version, seq ASC",
site_op);
}
if (!sql) return SQLITE_NOMEM;
int rc = sqlite3_prepare_v2(c->vtab->db, sql, -1, &c->src, NULL);
sqlite3_free(sql);
if (rc != SQLITE_OK) return rc;
sqlite3_bind_int64(c->src, 1, since);
sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT);
sqlite3_bind_int64(c->src, 3, until);
if (positional) {
sqlite3_bind_int64(c->src, 1, until);
sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT);
sqlite3_bind_int64(c->src, 3, resume_dbv);
sqlite3_bind_int64(c->src, 4, resume_dbv);
sqlite3_bind_int64(c->src, 5, resume_seq);
} else {
sqlite3_bind_int64(c->src, 1, since);
sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT);
sqlite3_bind_int64(c->src, 3, until);
}
rc = payload_chunks_step_source(c);
if (rc != SQLITE_OK) return rc;
return payload_chunks_build_next(c);
// Resuming inside a value that was fragmented across chunks: the first row is
// that value; re-establish the fragment plan and skip to resume_frag.
if (positional && resume_frag > 0 && c->has_row) {
rc = payload_chunks_begin_fragment(c, resume_frag);
if (rc != SQLITE_OK) return rc;
}
return payload_chunks_advance(c);
}

static int payload_chunks_next(sqlite3_vtab_cursor *cursor) {
return payload_chunks_build_next((cloudsync_payload_chunks_cursor *)cursor);
return payload_chunks_advance((cloudsync_payload_chunks_cursor *)cursor);
}

static int payload_chunks_eof(sqlite3_vtab_cursor *cursor) {
Expand All @@ -1455,6 +1554,10 @@ static int payload_chunks_column(sqlite3_vtab_cursor *cursor, sqlite3_context *c
case 4: sqlite3_result_int64(ctx, c->dbv_min); break;
case 5: sqlite3_result_int64(ctx, c->dbv_max); break;
case 6: sqlite3_result_int64(ctx, c->watermark); break;
case 11: sqlite3_result_int64(ctx, c->next_dbv); break;
case 12: sqlite3_result_int64(ctx, c->next_seq); break;
case 13: sqlite3_result_int64(ctx, c->next_frag_offset); break;
case 14: sqlite3_result_int(ctx, c->is_final ? 1 : 0); break;
default: sqlite3_result_null(ctx); break;
}
return SQLITE_OK;
Expand Down
Loading
Loading