Skip to content

Commit 8b1d2f0

Browse files
committed
feat: remove SET threads=1 requirement for table functions on DuckDB >= 1.5.0
1 parent 5327b98 commit 8b1d2f0

6 files changed

Lines changed: 67 additions & 22 deletions

File tree

lib/duckdb/connection.rb

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,12 +236,11 @@ def register_table_function(table_function)
236236
# @param columns [Hash{String => DuckDB::LogicalType}, nil] optional column schema override;
237237
# if omitted, the adapter determines the columns (e.g. from headers or inference)
238238
# @raise [ArgumentError] if no adapter is registered for the object's class
239-
# @raise [DuckDB::Error] if threads setting is not 1
239+
# @raise [DuckDB::Error] if threads > 1 on DuckDB < 1.5.0
240240
# @return [void]
241241
#
242242
# @example Expose a CSV as a table
243243
# require 'csv'
244-
# con.execute('SET threads=1')
245244
# DuckDB::TableFunction.add_table_adapter(CSV, CSVTableAdapter.new)
246245
# csv = CSV.new(File.read('data.csv'), headers: true)
247246
# con.expose_as_table(csv, 'csv_table')
@@ -263,16 +262,22 @@ def expose_as_table(object, name, columns: nil)
263262

264263
private
265264

265+
# DuckDB >= 1.5.0 provides per-worker proxy threads via init_local_state,
266+
# making table function callbacks thread-safe with multiple DuckDB threads.
267+
# On older versions, the global executor serializes all callbacks and can
268+
# deadlock under concurrent workloads, so we enforce threads=1.
266269
def check_threads
270+
return if Gem::Version.new(LIBRARY_VERSION) >= Gem::Version.new('1.5.0')
271+
267272
result = execute("SELECT current_setting('threads')")
268273
thread_count = result.first.first.to_i
269274

270275
return unless thread_count > 1
271276

272277
raise DuckDB::Error,
273-
'Functions with Ruby callbacks require single-threaded execution. ' \
274-
"Current threads setting: #{thread_count}. " \
275-
"Execute 'SET threads=1' before registering functions."
278+
'Table functions with Ruby callbacks require single-threaded execution ' \
279+
"on DuckDB < 1.5.0. Current threads setting: #{thread_count}. " \
280+
"Execute 'SET threads=1' before registering table functions."
276281
end
277282

278283
def run_appender_block(appender, &)

test/duckdb_test/data_chunk_test.rb

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def test_data_chunk_get_vector
5656

5757
# Test 4: Vector#logical_type returns LogicalType
5858
def test_vector_logical_type # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions
59-
@conn.execute('SET threads=1')
59+
@conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
6060

6161
table_function = DuckDB::TableFunction.new
6262
table_function.name = 'test_vector_type'
@@ -98,7 +98,7 @@ def test_vector_logical_type # rubocop:disable Metrics/AbcSize, Metrics/MethodLe
9898

9999
# Test 5: DataChunk#set_value with INTEGER
100100
def test_data_chunk_set_value_integer # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
101-
@conn.execute('SET threads=1')
101+
@conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
102102

103103
done = false
104104
table_function = DuckDB::TableFunction.new
@@ -133,7 +133,7 @@ def test_data_chunk_set_value_integer # rubocop:disable Metrics/AbcSize, Metrics
133133

134134
# Test 6: DataChunk#set_value with BIGINT
135135
def test_data_chunk_set_value_bigint # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
136-
@conn.execute('SET threads=1')
136+
@conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
137137

138138
done = false
139139
table_function = DuckDB::TableFunction.new
@@ -165,7 +165,7 @@ def test_data_chunk_set_value_bigint # rubocop:disable Metrics/AbcSize, Metrics/
165165

166166
# Test 7: DataChunk#set_value with VARCHAR
167167
def test_data_chunk_set_value_varchar # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
168-
@conn.execute('SET threads=1')
168+
@conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
169169

170170
done = false
171171
table_function = DuckDB::TableFunction.new
@@ -199,7 +199,7 @@ def test_data_chunk_set_value_varchar # rubocop:disable Metrics/AbcSize, Metrics
199199

200200
# Test 8: DataChunk#set_value with DOUBLE
201201
def test_data_chunk_set_value_double # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
202-
@conn.execute('SET threads=1')
202+
@conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
203203

204204
done = false
205205
table_function = DuckDB::TableFunction.new
@@ -233,7 +233,7 @@ def test_data_chunk_set_value_double # rubocop:disable Metrics/AbcSize, Metrics/
233233

234234
# Test 9: DataChunk#set_value with NULL
235235
def test_data_chunk_set_value_null # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions
236-
@conn.execute('SET threads=1')
236+
@conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
237237

238238
done = false
239239
table_function = DuckDB::TableFunction.new
@@ -269,7 +269,7 @@ def test_data_chunk_set_value_null # rubocop:disable Metrics/AbcSize, Metrics/Me
269269

270270
# Test 10: DataChunk#set_value with BLOB
271271
def test_data_chunk_set_value_blob # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
272-
@conn.execute('SET threads=1')
272+
@conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
273273

274274
done = false
275275
table_function = DuckDB::TableFunction.new
@@ -306,7 +306,7 @@ def test_data_chunk_set_value_blob # rubocop:disable Metrics/AbcSize, Metrics/Me
306306
# Test 11: DataChunk#set_value with multiple columns
307307
# rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions
308308
def test_data_chunk_set_value_multiple_columns
309-
@conn.execute('SET threads=1')
309+
@conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
310310

311311
done = false
312312
table_function = DuckDB::TableFunction.new
@@ -361,7 +361,7 @@ def test_data_chunk_set_value_multiple_columns
361361

362362
# Test 12: DataChunk#set_value with TIMESTAMP
363363
def test_data_chunk_set_value_timestamp # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
364-
@conn.execute('SET threads=1')
364+
@conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
365365

366366
done = false
367367
table_function = DuckDB::TableFunction.new
@@ -398,7 +398,7 @@ def test_data_chunk_set_value_timestamp # rubocop:disable Metrics/AbcSize, Metri
398398

399399
# Test 13: DataChunk#set_value with TIMESTAMP_TZ
400400
def test_data_chunk_set_value_timestamp_tz # rubocop:disable Metrics/AbcSize, Metrics/MethodLength, Minitest/MultipleAssertions
401-
@conn.execute('SET threads=1')
401+
@conn.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
402402

403403
done = false
404404
table_function = DuckDB::TableFunction.new

test/duckdb_test/gc_stress_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def test_table_function_with_gc_compaction
8383
skip 'GC.compact not available' unless GC.respond_to?(:compact)
8484
skip 'GC.compact hangs on Windows in parallel test execution' if Gem.win_platform?
8585

86-
@con.execute('SET threads=1') # Table functions still require single-threaded execution
86+
@con.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
8787

8888
# Capture local variables
8989
multiplier = 3
@@ -136,7 +136,7 @@ def test_mixed_functions_gc_stress
136136
skip 'GC.compact not available' unless GC.respond_to?(:compact)
137137
skip 'GC.compact hangs on Windows in parallel test execution' if Gem.win_platform?
138138

139-
@con.execute('SET threads=1') # Table functions still require single-threaded execution
139+
@con.execute('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
140140

141141
# Register both scalar and table functions
142142
@con.register_scalar_function(DuckDB::ScalarFunction.new.tap do |sf|

test/duckdb_test/table_function_csv_test.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ def csv_to_duckdb_data(csv, output)
5050
def setup
5151
@db = DuckDB::Database.open
5252
@con = @db.connect
53-
@con.execute('SET threads=1') # Required for Ruby callbacks
53+
return unless Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
54+
55+
@con.execute('SET threads=1')
5456
end
5557

5658
def teardown

test/duckdb_test/table_function_integration_test.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ class TableFunctionIntegrationTest < Minitest::Test
77
def setup
88
@database = DuckDB::Database.open
99
@connection = @database.connect
10-
@connection.execute('SET threads=1') # Required for Ruby callbacks
10+
return unless Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
11+
12+
@connection.execute('SET threads=1')
1113
end
1214

1315
def teardown

test/duckdb_test/table_function_test.rb

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def test_new
1616
def test_create_with_set_value
1717
db = DuckDB::Database.open
1818
conn = db.connect
19-
conn.query('SET threads=1')
19+
conn.query('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
2020

2121
called = 0
2222

@@ -92,7 +92,7 @@ def test_gc_compaction_safety
9292

9393
db = DuckDB::Database.open
9494
conn = db.connect
95-
conn.query('SET threads=1')
95+
conn.query('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
9696

9797
# Capture local variable in callbacks
9898
row_multiplier = 2
@@ -158,7 +158,7 @@ def test_gc_compaction_safety
158158
def test_symbol_columns
159159
db = DuckDB::Database.open
160160
conn = db.connect
161-
conn.query('SET threads=1')
161+
conn.query('SET threads=1') if Gem::Version.new(DuckDB::LIBRARY_VERSION) < Gem::Version.new('1.5.0')
162162

163163
# Capture local variable in callbacks
164164
row_multiplier = 2
@@ -206,6 +206,42 @@ def test_symbol_columns
206206
end
207207
# rubocop:enable Metrics/AbcSize, Metrics/MethodLength
208208

209+
def test_table_function_with_multithread # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
210+
unless Gem::Version.new(DuckDB::LIBRARY_VERSION) >= Gem::Version.new('1.5.0')
211+
skip 'per-worker proxy requires duckdb >= 1.5.0'
212+
end
213+
214+
db = DuckDB::Database.open
215+
conn = db.connect
216+
conn.execute('SET threads=4')
217+
218+
called = 0
219+
tf = DuckDB::TableFunction.new
220+
tf.name = 'mt_generate'
221+
tf.bind do |bind_info|
222+
bind_info.add_result_column('value', DuckDB::LogicalType::INTEGER)
223+
end
224+
tf.init { |_init_info| } # rubocop:disable Lint/EmptyBlock
225+
tf.execute do |_func_info, output|
226+
if called.zero?
227+
100.times { |i| output.set_value(0, i, i * 2) }
228+
output.size = 100
229+
called += 1
230+
else
231+
output.size = 0
232+
end
233+
end
234+
235+
conn.register_table_function(tf)
236+
result = conn.execute('SELECT SUM(value) FROM mt_generate()')
237+
238+
# sum(0, 2, 4, ..., 198) = 2 * sum(0..99) = 2 * 4950 = 9900
239+
assert_equal 9900, result.first.first
240+
241+
conn.disconnect
242+
db.close
243+
end
244+
209245
private
210246

211247
def setup_incomplete_function

0 commit comments

Comments
 (0)