diff --git a/src/sqlite/cloudsync_sqlite.c b/src/sqlite/cloudsync_sqlite.c index ba7a150..bba0478 100644 --- a/src/sqlite/cloudsync_sqlite.c +++ b/src/sqlite/cloudsync_sqlite.c @@ -1144,6 +1144,14 @@ typedef struct { int value_header_len; const char *value_data; int64_t value_data_len; + // Positional-cursor outputs: the resume point AFTER the chunk currently held. + // These live in the per-scan reset region (after eof) so xFilter's bulk memset + // clears them. next_* is the (db_version, seq, frag_offset) a follow-up call + // passes back as resume_* to continue exactly where this chunk stopped. + int64_t next_dbv; + int64_t next_seq; + int64_t next_frag_offset; + bool is_final; } cloudsync_payload_chunks_cursor; static int payload_chunks_connect(sqlite3 *db, void *aux, int argc, const char *const *argv, sqlite3_vtab **vtab, char **err) { @@ -1151,7 +1159,13 @@ static int payload_chunks_connect(sqlite3 *db, void *aux, int argc, const char * int rc = sqlite3_declare_vtab(db, "CREATE TABLE x(payload BLOB, chunk_index INTEGER, payload_size INTEGER, rows INTEGER, " "db_version_min INTEGER, db_version_max INTEGER, watermark_db_version INTEGER, " - "since_db_version HIDDEN, site_id HIDDEN, until_db_version HIDDEN, exclude_filter_site_id HIDDEN)"); + "since_db_version HIDDEN, site_id HIDDEN, until_db_version HIDDEN, exclude_filter_site_id HIDDEN, " + // Positional-cursor outputs (cols 11..14): the resume point after the + // emitted chunk, plus a final-chunk flag. A stateless /check passes these + // back as the resume_* inputs (cols 15..17) to continue the drain without + // a spool table — O(1) seek per chunk instead of replaying from since. + "next_db_version INTEGER, next_seq INTEGER, next_frag_offset INTEGER, is_final INTEGER, " + "resume_db_version HIDDEN, resume_seq HIDDEN, resume_frag_offset HIDDEN)"); if (rc != SQLITE_OK) return rc; cloudsync_payload_chunks_vtab *p = sqlite3_malloc64(sizeof(*p)); if (!p) return SQLITE_NOMEM; @@ -1185,19 +1199,23 @@ static int payload_chunks_close(sqlite3_vtab_cursor *cursor) { static int payload_chunks_best_index(sqlite3_vtab *vtab, sqlite3_index_info *idxinfo) { UNUSED_PARAMETER(vtab); + // Assign argvIndex in a canonical hidden-column order so xFilter can read argv + // in a fixed order regardless of how SQLite presents constraints. idxNum bit k + // is set when handled_cols[k] is bound; xFilter reads argv in this same order. + // bit0=since_db_version(7) bit1=site_id(8) bit2=until_db_version(9) + // bit3=exclude_filter_site_id(10) bit4=resume_db_version(15) + // bit5=resume_seq(16) bit6=resume_frag_offset(17) + static const int handled_cols[] = {7, 8, 9, 10, 15, 16, 17}; int argv_index = 1; int idxnum = 0; - // Assign argvIndex in canonical hidden-column order (7..10) so xFilter can - // read argv in a fixed order regardless of how SQLite presents constraints. - // Hidden columns: 7=since_db_version, 8=site_id, 9=until_db_version, - // 10=exclude_filter_site_id. - for (int col = 7; col <= 10; ++col) { + for (size_t k = 0; k < sizeof(handled_cols) / sizeof(handled_cols[0]); ++k) { + int col = handled_cols[k]; for (int i = 0; i < idxinfo->nConstraint; ++i) { struct sqlite3_index_constraint *cn = &idxinfo->aConstraint[i]; if (!cn->usable || cn->op != SQLITE_INDEX_CONSTRAINT_EQ || cn->iColumn != col) continue; idxinfo->aConstraintUsage[i].argvIndex = argv_index++; idxinfo->aConstraintUsage[i].omit = 1; - idxnum |= (1 << (col - 7)); + idxnum |= (1 << k); break; // at most one constraint consumed per hidden column } } @@ -1249,6 +1267,33 @@ static int payload_chunks_plan_fragment(cloudsync_payload_chunks_cursor *c) { return SQLITE_OK; } +// Set up fragment state for the current source row (a single value larger than +// max_chunk_size) so emit_fragment can stream it. start_offset is the byte offset +// within the encoded value to resume from (0 when first reaching the value; +// >0 when a positional cursor resumes mid-value). frag_part is derived from the +// offset so the fragment's part index is consistent whether reached by streaming +// or by a seek. The plan (frag_target/frag_count) is a deterministic function of +// the row, so a resumed fragment tiles identically to a streamed one. +static int payload_chunks_begin_fragment(cloudsync_payload_chunks_cursor *c, int64_t start_offset) { + dbvalue_t *col_value = (dbvalue_t *)sqlite3_column_value(c->src, 3); + int type = database_value_type(col_value); + if (type != DBTYPE_TEXT && type != DBTYPE_BLOB) return SQLITE_TOOBIG; + int64_t raw_len = 0; + int header_len = cloudsync_payload_encoded_value_header(col_value, c->value_header, sizeof(c->value_header), &raw_len); + if (header_len <= 0) return SQLITE_ERROR; + c->value_header_len = header_len; + c->value_data = (const char *)database_value_blob(col_value); + c->value_data_len = raw_len; + c->frag_total = header_len + raw_len; + c->frag_offset = start_offset; + int rc = payload_chunks_plan_fragment(c); + if (rc != SQLITE_OK) return rc; + c->frag_part = (c->frag_target > 0) ? (int)(start_offset / c->frag_target) : 0; + c->frag_checksum = cloudsync_payload_encoded_value_checksum(col_value); + c->frag_active = true; + return SQLITE_OK; +} + static int payload_chunks_emit_fragment(cloudsync_payload_chunks_cursor *c) { cloudsync_context *data = c->vtab->data; if (c->payload) { cloudsync_memory_free(c->payload); c->payload = NULL; } @@ -1321,23 +1366,9 @@ static int payload_chunks_build_next(cloudsync_payload_chunks_cursor *c) { if ((int64_t)row_size + (int64_t)payload_header_size + CLOUDSYNC_PAYLOAD_CHUNK_SAFETY_MARGIN > max_size) { if (cloudsync_payload_context_nrows(payload) > 0) break; - dbvalue_t *col_value = (dbvalue_t *)rowv[3]; - int type = database_value_type(col_value); - if (type != DBTYPE_TEXT && type != DBTYPE_BLOB) { cloudsync_memory_free(payload); return SQLITE_TOOBIG; } - int64_t raw_len = 0; - int header_len = cloudsync_payload_encoded_value_header(col_value, c->value_header, sizeof(c->value_header), &raw_len); - if (header_len <= 0) { cloudsync_memory_free(payload); return SQLITE_ERROR; } - c->value_header_len = header_len; - c->value_data = (const char *)database_value_blob(col_value); - c->value_data_len = raw_len; - c->frag_total = header_len + raw_len; - c->frag_offset = 0; - c->frag_part = 0; - rc = payload_chunks_plan_fragment(c); - if (rc != SQLITE_OK) { cloudsync_memory_free(payload); return rc; } - c->frag_checksum = cloudsync_payload_encoded_value_checksum(col_value); - c->frag_active = true; cloudsync_memory_free(payload); + rc = payload_chunks_begin_fragment(c, 0); + if (rc != SQLITE_OK) return rc; return payload_chunks_emit_fragment(c); } @@ -1360,6 +1391,38 @@ static int payload_chunks_build_next(cloudsync_payload_chunks_cursor *c) { return SQLITE_OK; } +// Record the resume point a stateless caller passes back to continue after the +// chunk just built. Reads the source statement, which is positioned at the next +// unconsumed row (or the same row when a value is still mid-fragment). Must be +// called only after build_next produced a chunk (i.e. !eof). +static void payload_chunks_set_next_cursor(cloudsync_payload_chunks_cursor *c) { + if (c->frag_active) { + // Mid-value: resume the same row at the next byte offset. + c->next_dbv = sqlite3_column_int64(c->src, 5); + c->next_seq = sqlite3_column_int64(c->src, 8); + c->next_frag_offset = c->frag_offset; + c->is_final = false; + } else if (c->has_row) { + // Row boundary: the next chunk starts at the current (unconsumed) row. + c->next_dbv = sqlite3_column_int64(c->src, 5); + c->next_seq = sqlite3_column_int64(c->src, 8); + c->next_frag_offset = 0; + c->is_final = false; + } else { + // Stream exhausted: this was the last chunk of the window. + c->next_dbv = c->watermark; + c->next_seq = 0; + c->next_frag_offset = 0; + c->is_final = true; + } +} + +static int payload_chunks_advance(cloudsync_payload_chunks_cursor *c) { + int rc = payload_chunks_build_next(c); + if (rc == SQLITE_OK && !c->eof) payload_chunks_set_next_cursor(c); + return rc; +} + static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const char *idxstr, int argc, sqlite3_value **argv) { UNUSED_PARAMETER(idxstr); UNUSED_PARAMETER(argc); cloudsync_payload_chunks_cursor *c = (cloudsync_payload_chunks_cursor *)cursor; @@ -1378,6 +1441,13 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const bool site_id_given = false; int64_t until = 0; bool exclude = false; + // Positional resume cursor (cols 15..17): when resume_db_version is bound the + // scan starts at (resume_db_version, resume_seq) inclusive and the first chunk + // resumes a mid-value fragment at resume_frag_offset, instead of replaying the + // whole window from `since`. Lets a stateless /check page the stream with an + // O(1) seek per call and no spool table. + bool positional = false; + int64_t resume_dbv = 0, resume_seq = 0, resume_frag = 0; if (idxnum & 1) since = sqlite3_value_int64(argv[argi++]); if (idxnum & 2) { if (sqlite3_value_type(argv[argi]) != SQLITE_NULL) { @@ -1389,6 +1459,9 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const } if (idxnum & 4) until = sqlite3_value_int64(argv[argi++]); if (idxnum & 8) exclude = (sqlite3_value_int(argv[argi++]) != 0); + if (idxnum & 16) { resume_dbv = sqlite3_value_int64(argv[argi++]); positional = true; } + if (idxnum & 32) resume_seq = sqlite3_value_int64(argv[argi++]); + if (idxnum & 64) resume_frag = sqlite3_value_int64(argv[argi++]); // Resolve the site filter: // exclude=true -> all sites except filter_site_id (CHECK path); site required @@ -1421,24 +1494,50 @@ static int payload_chunks_filter(sqlite3_vtab_cursor *cursor, int idxnum, const } c->watermark = until; - char *sql = sqlite3_mprintf( - "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " - "FROM cloudsync_changes WHERE db_version>? AND site_id%s? AND db_version<=? ORDER BY db_version, seq ASC", - site_op); + // Window upper bound is always `until`. The lower bound is either the legacy + // exclusive `since` (db_version > since) or the inclusive positional cursor + // (db_version, seq) >= (resume_dbv, resume_seq). + char *sql; + if (positional) { + sql = sqlite3_mprintf( + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes WHERE db_version<=? AND site_id%s? AND " + "(db_version>? OR (db_version=? AND seq>=?)) ORDER BY db_version, seq ASC", + site_op); + } else { + sql = sqlite3_mprintf( + "SELECT tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq " + "FROM cloudsync_changes WHERE db_version>? AND site_id%s? AND db_version<=? ORDER BY db_version, seq ASC", + site_op); + } if (!sql) return SQLITE_NOMEM; int rc = sqlite3_prepare_v2(c->vtab->db, sql, -1, &c->src, NULL); sqlite3_free(sql); if (rc != SQLITE_OK) return rc; - sqlite3_bind_int64(c->src, 1, since); - sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); - sqlite3_bind_int64(c->src, 3, until); + if (positional) { + sqlite3_bind_int64(c->src, 1, until); + sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); + sqlite3_bind_int64(c->src, 3, resume_dbv); + sqlite3_bind_int64(c->src, 4, resume_dbv); + sqlite3_bind_int64(c->src, 5, resume_seq); + } else { + sqlite3_bind_int64(c->src, 1, since); + sqlite3_bind_blob(c->src, 2, site_id, site_id_len, SQLITE_TRANSIENT); + sqlite3_bind_int64(c->src, 3, until); + } rc = payload_chunks_step_source(c); if (rc != SQLITE_OK) return rc; - return payload_chunks_build_next(c); + // Resuming inside a value that was fragmented across chunks: the first row is + // that value; re-establish the fragment plan and skip to resume_frag. + if (positional && resume_frag > 0 && c->has_row) { + rc = payload_chunks_begin_fragment(c, resume_frag); + if (rc != SQLITE_OK) return rc; + } + return payload_chunks_advance(c); } static int payload_chunks_next(sqlite3_vtab_cursor *cursor) { - return payload_chunks_build_next((cloudsync_payload_chunks_cursor *)cursor); + return payload_chunks_advance((cloudsync_payload_chunks_cursor *)cursor); } static int payload_chunks_eof(sqlite3_vtab_cursor *cursor) { @@ -1455,6 +1554,10 @@ static int payload_chunks_column(sqlite3_vtab_cursor *cursor, sqlite3_context *c case 4: sqlite3_result_int64(ctx, c->dbv_min); break; case 5: sqlite3_result_int64(ctx, c->dbv_max); break; case 6: sqlite3_result_int64(ctx, c->watermark); break; + case 11: sqlite3_result_int64(ctx, c->next_dbv); break; + case 12: sqlite3_result_int64(ctx, c->next_seq); break; + case 13: sqlite3_result_int64(ctx, c->next_frag_offset); break; + case 14: sqlite3_result_int(ctx, c->is_final ? 1 : 0); break; default: sqlite3_result_null(ctx); break; } return SQLITE_OK; diff --git a/test/unit.c b/test/unit.c index c23247c..88c4eff 100644 --- a/test/unit.c +++ b/test/unit.c @@ -12556,6 +12556,153 @@ bool do_test_payload_chunks_split_dbversion (bool print_result, bool cleanup_dat return result; } +// Proves the positional-cursor resume of cloudsync_payload_chunks: paging the +// window one chunk per call with an O(1) (db_version, seq, frag_offset) seek +// yields byte-identical chunks to a single full-window scan. The dataset mixes a +// db_version split across chunks (row-boundary resumes, incl. resumes landing +// INSIDE a single committed version that the old since>db_version cursor could not +// express) with a value larger than the chunk budget (mid-fragment resumes). +bool do_test_payload_chunks_positional_resume (bool print_result, bool cleanup_databases) { + sqlite3 *db = NULL; + sqlite3_stmt *stmt = NULL; + test_payload_chunk *base = NULL; int base_count = 0, base_cap = 0; + test_payload_chunk *pos = NULL; int pos_count = 0, pos_cap = 0; + int64_t watermark = -1; + bool result = false; + int rc = SQLITE_OK; + + time_t timestamp = time(NULL); + int saved_counter = test_counter++; + + db = do_create_database_file(0, timestamp, saved_counter); + if (!db) goto finalize; + rc = sqlite3_exec(db, + "CREATE TABLE split_test (id TEXT PRIMARY KEY, body TEXT DEFAULT '');" + "SELECT cloudsync_init('split_test');" + "SELECT cloudsync_set('payload_max_chunk_size', '262144');", + NULL, NULL, NULL); + if (rc != SQLITE_OK) goto finalize; + + // tx1: ~500 medium rows in one transaction -> one db_version split across + // several v2 chunks (row-boundary resumes within a single version). + rc = sqlite3_exec(db, + "WITH RECURSIVE c(i) AS (SELECT 1 UNION ALL SELECT i+1 FROM c WHERE i < 500) " + "INSERT INTO split_test(id, body) SELECT printf('row-%04d', i), hex(randomblob(700)) FROM c;", + NULL, NULL, NULL); + if (rc != SQLITE_OK) goto finalize; + + // tx2: one value far larger than the chunk budget -> v3 fragments across + // several chunks (mid-fragment resumes inside a single value). + rc = sqlite3_exec(db, + "INSERT INTO split_test(id, body) VALUES ('big', hex(randomblob(900000)));", + NULL, NULL, NULL); + if (rc != SQLITE_OK) goto finalize; + + // Baseline: every chunk of the whole window, in order, via the legacy scan. + rc = sqlite3_prepare_v2(db, + "SELECT payload, watermark_db_version FROM cloudsync_payload_chunks " + "WHERE since_db_version=0 ORDER BY chunk_index;", -1, &stmt, NULL); + if (rc != SQLITE_OK) goto finalize; + while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { + int len = sqlite3_column_bytes(stmt, 0); + const void *payload = sqlite3_column_blob(stmt, 0); + if (!payload || len <= 0) goto finalize; + watermark = sqlite3_column_int64(stmt, 1); + if (base_count == base_cap) { + int nc = base_cap ? base_cap * 2 : 8; + test_payload_chunk *t = realloc(base, sizeof(*t) * nc); + if (!t) goto finalize; + memset(t + base_cap, 0, sizeof(*t) * (nc - base_cap)); + base = t; base_cap = nc; + } + base[base_count].data = malloc(len); + if (!base[base_count].data) goto finalize; + memcpy(base[base_count].data, payload, len); + base[base_count].len = len; + ++base_count; + } + if (rc != SQLITE_DONE) goto finalize; + sqlite3_finalize(stmt); stmt = NULL; + + // Scenario must actually exercise multiple chunks (and thus resumes). + if (base_count < 4 || watermark <= 0) goto finalize; + + // Positional drain: one chunk per call, seeking to the cursor the previous + // chunk reported. until is the frozen watermark from the baseline. + rc = sqlite3_prepare_v2(db, + "SELECT payload, next_db_version, next_seq, next_frag_offset, is_final " + "FROM cloudsync_payload_chunks " + "WHERE until_db_version=?1 AND resume_db_version=?2 AND resume_seq=?3 AND resume_frag_offset=?4 " + "LIMIT 1;", -1, &stmt, NULL); + if (rc != SQLITE_OK) goto finalize; + + int64_t rdbv = 0, rseq = 0, rfrag = 0; + bool done = false; + bool saw_frag_resume = false; // a follow-up call actually resumed mid-value + // Hard cap guards against a resume bug looping forever. + for (int guard = 0; !done && guard <= base_count + 2; ++guard) { + if (rfrag > 0) saw_frag_resume = true; + sqlite3_reset(stmt); + sqlite3_bind_int64(stmt, 1, watermark); + sqlite3_bind_int64(stmt, 2, rdbv); + sqlite3_bind_int64(stmt, 3, rseq); + sqlite3_bind_int64(stmt, 4, rfrag); + rc = sqlite3_step(stmt); + if (rc != SQLITE_ROW) goto finalize; // every step before is_final must yield a chunk + int len = sqlite3_column_bytes(stmt, 0); + const void *payload = sqlite3_column_blob(stmt, 0); + if (!payload || len <= 0) goto finalize; + rdbv = sqlite3_column_int64(stmt, 1); + rseq = sqlite3_column_int64(stmt, 2); + rfrag = sqlite3_column_int64(stmt, 3); + done = sqlite3_column_int(stmt, 4) != 0; + if (pos_count == pos_cap) { + int nc = pos_cap ? pos_cap * 2 : 8; + test_payload_chunk *t = realloc(pos, sizeof(*t) * nc); + if (!t) goto finalize; + memset(t + pos_cap, 0, sizeof(*t) * (nc - pos_cap)); + pos = t; pos_cap = nc; + } + pos[pos_count].data = malloc(len); + if (!pos[pos_count].data) goto finalize; + memcpy(pos[pos_count].data, payload, len); + pos[pos_count].len = len; + ++pos_count; + } + sqlite3_finalize(stmt); stmt = NULL; + + // The positional drain must terminate exactly on is_final, reproduce the + // baseline chunk sequence byte-for-byte, and have actually exercised a + // mid-value (fragment) resume — not only row-boundary resumes. + if (!done || pos_count != base_count || !saw_frag_resume) goto finalize; + for (int i = 0; i < base_count; ++i) { + if (pos[i].len != base[i].len) goto finalize; + if (memcmp(pos[i].data, base[i].data, base[i].len) != 0) goto finalize; + } + + result = true; + +finalize: + if (!result && print_result) { + printf("do_test_payload_chunks_positional_resume error: %s (base=%d, pos=%d, watermark=%lld)\n", + db ? sqlite3_errmsg(db) : "no db", base_count, pos_count, (long long)watermark); + } + if (stmt) sqlite3_finalize(stmt); + test_payload_chunks_free(base, base_count); + test_payload_chunks_free(pos, pos_count); + if (db) close_db(db); + if (cleanup_databases) { + char path[256], walpath[300], shmpath[300]; + do_build_database_path(path, 0, timestamp, saved_counter); + snprintf(walpath, sizeof(walpath), "%s-wal", path); + snprintf(shmpath, sizeof(shmpath), "%s-shm", path); + file_delete_internal(path); + file_delete_internal(walpath); + file_delete_internal(shmpath); + } + return result; +} + // Exercises the server-side download spool: cloudsync_payload_spool_fill stages a // window's whole chunk stream once, and the /check path pages it out one chunk per // call. Verifies byte-identity with direct cloudsync_payload_chunks generation, @@ -13196,6 +13343,7 @@ int main (int argc, const char * argv[]) { result += test_report("Payload Chunks Large Values:", do_test_payload_chunks_large_values(print_result, cleanup_databases)); result += test_report("Payload Chunks Site Exclusion:", do_test_payload_chunks_site_exclusion(print_result, cleanup_databases)); result += test_report("Payload Chunks Split db_version:", do_test_payload_chunks_split_dbversion(print_result, cleanup_databases)); + result += test_report("Payload Chunks Positional Resume:", do_test_payload_chunks_positional_resume(print_result, cleanup_databases)); result += test_report("Payload Download Spool:", do_test_payload_spool(print_result, cleanup_databases)); // close local database