Skip to content

Commit c70fdae

Browse files
committed
refactor: add three-path dispatch to table function callbacks
Refactor table_function.c bind/init/execute callbacks to use the same three-path dispatch pattern as scalar_function.c: 1. Ruby thread WITH GVL -> call directly 2. Ruby thread WITHOUT GVL -> rb_thread_call_with_gvl 3. Non-Ruby thread -> dispatch to global executor Each callback's core logic is extracted into a *_with_gvl() function (table_bind_with_gvl, table_init_with_gvl, table_execute_with_gvl) that the dispatch paths invoke. The global executor is started when the execute callback is registered. This makes table function callbacks thread-safe when invoked from non-Ruby threads. The SET threads=1 restriction is not yet removed (done in a subsequent commit).
1 parent a13e517 commit c70fdae

1 file changed

Lines changed: 112 additions & 32 deletions

File tree

ext/duckdb/table_function.c

Lines changed: 112 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
#include "ruby-duckdb.h"
22

3+
/*
4+
* Thread detection functions (available since Ruby 2.3).
5+
*/
6+
extern int ruby_thread_has_gvl_p(void);
7+
extern int ruby_native_thread_p(void);
8+
39
VALUE cDuckDBTableFunction;
410
extern VALUE cDuckDBTableFunctionBindInfo;
511
extern VALUE cDuckDBTableFunctionInitInfo;
@@ -217,32 +223,55 @@ static VALUE call_bind_proc(VALUE arg) {
217223
return rb_funcall(args[0], rb_intern("call"), 1, args[1]);
218224
}
219225

220-
static void table_function_bind_callback(duckdb_bind_info info) {
226+
/* Bind callback core logic — must be called with GVL held */
227+
struct table_bind_arg {
228+
duckdb_bind_info info;
221229
rubyDuckDBTableFunction *ctx;
230+
};
231+
232+
static void table_bind_with_gvl(void *data) {
233+
struct table_bind_arg *arg = (struct table_bind_arg *)data;
222234
rubyDuckDBBindInfo *bind_info_ctx;
223235
VALUE bind_info_obj;
224236
int state = 0;
225237

226-
// Get the C struct pointer (safe with GC compaction)
227-
ctx = (rubyDuckDBTableFunction *)duckdb_bind_get_extra_info(info);
228-
if (!ctx || ctx->bind_proc == Qnil) {
238+
if (!arg->ctx || arg->ctx->bind_proc == Qnil) {
229239
return;
230240
}
231241

232-
// Create BindInfo wrapper
233242
bind_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionBindInfo);
234243
bind_info_ctx = get_struct_bind_info(bind_info_obj);
235-
bind_info_ctx->bind_info = info;
244+
bind_info_ctx->bind_info = arg->info;
236245

237-
// Call Ruby block with exception protection
238-
VALUE call_args[2] = { ctx->bind_proc, bind_info_obj };
246+
VALUE call_args[2] = { arg->ctx->bind_proc, bind_info_obj };
239247
rb_protect(call_bind_proc, (VALUE)call_args, &state);
240248

241249
if (state) {
242250
VALUE err = rb_errinfo();
243251
VALUE msg = rb_funcall(err, rb_intern("message"), 0);
244-
duckdb_bind_set_error(info, StringValueCStr(msg));
245-
rb_set_errinfo(Qnil); // Clear the error
252+
duckdb_bind_set_error(arg->info, StringValueCStr(msg));
253+
rb_set_errinfo(Qnil);
254+
}
255+
}
256+
257+
static void *table_bind_gvl_wrapper(void *data) {
258+
table_bind_with_gvl(data);
259+
return NULL;
260+
}
261+
262+
static void table_function_bind_callback(duckdb_bind_info info) {
263+
struct table_bind_arg arg;
264+
arg.info = info;
265+
arg.ctx = (rubyDuckDBTableFunction *)duckdb_bind_get_extra_info(info);
266+
267+
if (ruby_native_thread_p()) {
268+
if (ruby_thread_has_gvl_p()) {
269+
table_bind_with_gvl(&arg);
270+
} else {
271+
rb_thread_call_with_gvl(table_bind_gvl_wrapper, &arg);
272+
}
273+
} else {
274+
rbduckdb_executor_dispatch(table_bind_with_gvl, &arg);
246275
}
247276
}
248277

@@ -281,32 +310,55 @@ static VALUE call_init_proc(VALUE args_val) {
281310
return rb_funcall(args[0], rb_intern("call"), 1, args[1]);
282311
}
283312

284-
static void table_function_init_callback(duckdb_init_info info) {
313+
/* Init callback core logic — must be called with GVL held */
314+
struct table_init_arg {
315+
duckdb_init_info info;
285316
rubyDuckDBTableFunction *ctx;
317+
};
318+
319+
static void table_init_with_gvl(void *data) {
320+
struct table_init_arg *arg = (struct table_init_arg *)data;
286321
VALUE init_info_obj;
287322
rubyDuckDBInitInfo *init_info_ctx;
288323
int state = 0;
289324

290-
// Get the C struct pointer (safe with GC compaction)
291-
ctx = (rubyDuckDBTableFunction *)duckdb_init_get_extra_info(info);
292-
if (!ctx || ctx->init_proc == Qnil) {
325+
if (!arg->ctx || arg->ctx->init_proc == Qnil) {
293326
return;
294327
}
295328

296-
// Create InitInfo wrapper
297329
init_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionInitInfo);
298330
init_info_ctx = get_struct_init_info(init_info_obj);
299-
init_info_ctx->info = info;
331+
init_info_ctx->info = arg->info;
300332

301-
// Call Ruby block with exception protection
302-
VALUE call_args[2] = { ctx->init_proc, init_info_obj };
333+
VALUE call_args[2] = { arg->ctx->init_proc, init_info_obj };
303334
rb_protect(call_init_proc, (VALUE)call_args, &state);
304335

305336
if (state) {
306337
VALUE err = rb_errinfo();
307338
VALUE msg = rb_funcall(err, rb_intern("message"), 0);
308-
duckdb_init_set_error(info, StringValueCStr(msg));
309-
rb_set_errinfo(Qnil); // Clear the error
339+
duckdb_init_set_error(arg->info, StringValueCStr(msg));
340+
rb_set_errinfo(Qnil);
341+
}
342+
}
343+
344+
static void *table_init_gvl_wrapper(void *data) {
345+
table_init_with_gvl(data);
346+
return NULL;
347+
}
348+
349+
static void table_function_init_callback(duckdb_init_info info) {
350+
struct table_init_arg arg;
351+
arg.info = info;
352+
arg.ctx = (rubyDuckDBTableFunction *)duckdb_init_get_extra_info(info);
353+
354+
if (ruby_native_thread_p()) {
355+
if (ruby_thread_has_gvl_p()) {
356+
table_init_with_gvl(&arg);
357+
} else {
358+
rb_thread_call_with_gvl(table_init_gvl_wrapper, &arg);
359+
}
360+
} else {
361+
rbduckdb_executor_dispatch(table_init_with_gvl, &arg);
310362
}
311363
}
312364

@@ -335,6 +387,9 @@ static VALUE rbduckdb_table_function_set_execute(VALUE self) {
335387
ctx->execute_proc = rb_block_proc();
336388
duckdb_table_function_set_function(ctx->table_function, table_function_execute_callback);
337389

390+
/* Ensure the global executor thread is running for multi-thread dispatch */
391+
rbduckdb_executor_ensure_started();
392+
338393
return self;
339394
}
340395

@@ -343,39 +398,64 @@ static VALUE call_execute_proc(VALUE args_val) {
343398
return rb_funcall(args[0], rb_intern("call"), 2, args[1], args[2]);
344399
}
345400

346-
static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output) {
401+
/* Execute callback core logic — must be called with GVL held */
402+
struct table_execute_arg {
403+
duckdb_function_info info;
404+
duckdb_data_chunk output;
347405
rubyDuckDBTableFunction *ctx;
406+
};
407+
408+
static void table_execute_with_gvl(void *data) {
409+
struct table_execute_arg *arg = (struct table_execute_arg *)data;
348410
VALUE func_info_obj;
349411
VALUE data_chunk_obj;
350412
rubyDuckDBFunctionInfo *func_info_ctx;
351413
rubyDuckDBDataChunk *data_chunk_ctx;
352414
int state = 0;
353415

354-
// Get the C struct pointer (safe with GC compaction)
355-
ctx = (rubyDuckDBTableFunction *)duckdb_function_get_extra_info(info);
356-
if (!ctx || ctx->execute_proc == Qnil) {
416+
if (!arg->ctx || arg->ctx->execute_proc == Qnil) {
357417
return;
358418
}
359419

360-
// Create FunctionInfo wrapper
361420
func_info_obj = rb_class_new_instance(0, NULL, cDuckDBTableFunctionFunctionInfo);
362421
func_info_ctx = get_struct_function_info(func_info_obj);
363-
func_info_ctx->info = info;
422+
func_info_ctx->info = arg->info;
364423

365-
// Create DataChunk wrapper
366424
data_chunk_obj = rb_class_new_instance(0, NULL, cDuckDBDataChunk);
367425
data_chunk_ctx = get_struct_data_chunk(data_chunk_obj);
368-
data_chunk_ctx->data_chunk = output;
426+
data_chunk_ctx->data_chunk = arg->output;
369427

370-
// Call Ruby block with exception protection
371-
VALUE call_args[3] = { ctx->execute_proc, func_info_obj, data_chunk_obj };
428+
VALUE call_args[3] = { arg->ctx->execute_proc, func_info_obj, data_chunk_obj };
372429
rb_protect(call_execute_proc, (VALUE)call_args, &state);
373430

374431
if (state) {
375432
VALUE err = rb_errinfo();
376433
VALUE msg = rb_funcall(err, rb_intern("message"), 0);
377-
duckdb_function_set_error(info, StringValueCStr(msg));
378-
rb_set_errinfo(Qnil); // Clear the error
434+
duckdb_function_set_error(arg->info, StringValueCStr(msg));
435+
rb_set_errinfo(Qnil);
436+
}
437+
}
438+
439+
static void *table_execute_gvl_wrapper(void *data) {
440+
table_execute_with_gvl(data);
441+
return NULL;
442+
}
443+
444+
static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output) {
445+
struct table_execute_arg arg;
446+
arg.info = info;
447+
arg.output = output;
448+
arg.ctx = (rubyDuckDBTableFunction *)duckdb_function_get_extra_info(info);
449+
450+
if (ruby_native_thread_p()) {
451+
if (ruby_thread_has_gvl_p()) {
452+
table_execute_with_gvl(&arg);
453+
} else {
454+
rb_thread_call_with_gvl(table_execute_gvl_wrapper, &arg);
455+
}
456+
} else {
457+
/* Non-Ruby thread — dispatch to global executor */
458+
rbduckdb_executor_dispatch(table_execute_with_gvl, &arg);
379459
}
380460
}
381461

0 commit comments

Comments
 (0)