Skip to content

Commit 952e5a3

Browse files
committed
Add minimum block size for writing in RecordThread
1 parent 2f6e5d3 commit 952e5a3

4 files changed

Lines changed: 89 additions & 27 deletions

File tree

Source/Processors/RecordNode/DataQueue.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,9 +300,25 @@ const SynchronizedTimestampBuffer& DataQueue::getTimestampBufferReference() cons
300300
return m_FTSBuffer;
301301
}
302302

303+
int DataQueue::getNumSamplesReady() const
304+
{
305+
if (m_numChans == 0)
306+
return 0;
307+
308+
int minReady = INT_MAX;
309+
for (int chan = 0; chan < m_numChans; ++chan)
310+
{
311+
int ready = m_fifos.getUnchecked (chan)->getNumReady();
312+
if (ready < minReady)
313+
minReady = ready;
314+
}
315+
return (minReady == INT_MAX) ? 0 : minReady;
316+
}
317+
303318
bool DataQueue::startRead (std::vector<CircularBufferIndexes>& dataBufferIdxs,
304319
std::vector<CircularBufferIndexes>& timestampBufferIdxs,
305320
int64& sampleNumber,
321+
int nMin,
306322
int nMax)
307323
{
308324
//This should never happen, but it never hurts to be on the safe side.
@@ -324,8 +340,8 @@ bool DataQueue::startRead (std::vector<CircularBufferIndexes>& dataBufferIdxs,
324340
// Apply nMax limit to the minimum
325341
int samplesToRead = ((minSamplesAvailable > nMax) && (nMax > 0)) ? nMax : minSamplesAvailable;
326342

327-
// If no samples available on any channel, nothing to read
328-
if (samplesToRead == 0)
343+
// If not enough samples available (below nMin threshold), skip this read
344+
if (samplesToRead < nMin)
329345
{
330346
// Initialize all indexes to zero
331347
for (int chan = 0; chan < m_numChans; ++chan)

Source/Processors/RecordNode/DataQueue.h

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,29 @@ class DataQueue
8080
* @param sampleNumber Starting sample number for this block
8181
* @return Maximum FIFO usage across all channels (0.0 to 1.0)
8282
*/
83-
float writeAllChannels (const AudioBuffer<float>& buffer,
83+
float writeAllChannels (const AudioBuffer<float>& buffer,
8484
const Array<int>& srcChannels,
85-
int nSamples,
85+
int nSamples,
8686
int64 sampleNumber);
8787

8888
/** Writes an array of timestamps for one stream */
8989
float writeSynchronizedTimestamps (double start, double step, int destChannel, int nSamples);
9090

91-
/** Start reading data for all channels in this stream */
91+
/** Returns the number of samples available to read (minimum across all channels) */
92+
int getNumSamplesReady() const;
93+
94+
/** Start reading data for all channels in this stream
95+
* @param dataBufferIdxs Output: indices for reading continuous data
96+
* @param timestampBufferIdxs Output: indices for reading timestamps
97+
* @param sampleNumber Output: sample number for the start of the read
98+
* @param nMin Minimum samples required - returns false if fewer available
99+
* @param nMax Maximum samples to read
100+
* @return true if read was started successfully, false otherwise
101+
*/
92102
bool startRead (std::vector<CircularBufferIndexes>& dataBufferIdxs,
93103
std::vector<CircularBufferIndexes>& timestampBufferIdxs,
94104
int64& sampleNumber,
105+
int nMin,
95106
int nMax);
96107

97108
/** Called when data read is finished */
@@ -120,8 +131,8 @@ class DataQueue
120131

121132
std::vector<int> m_readSamples;
122133
std::vector<int> m_readFTSSamples;
123-
std::vector<int64> m_sampleNumbers; // Per-stream sample numbers (one per block)
124-
int64 m_lastReadSampleNumber; // Last sample number read for the stream
134+
std::vector<int64> m_sampleNumbers; // Per-stream sample numbers (one per block)
135+
int64 m_lastReadSampleNumber; // Last sample number read for the stream
125136

126137
int m_numChans;
127138
int m_numFTSChans;

Source/Processors/RecordNode/RecordThread.cpp

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,18 @@ RecordThread::RecordThread (RecordNode* parentNode, RecordEngine* engine) : Thre
3131
m_engine (engine),
3232
recordNode (parentNode),
3333
m_receivedFirstBlock (false),
34-
m_cleanExit (true)
35-
//samplesWritten(0)
34+
m_cleanExit (true),
35+
m_minWriteSamples (BLOCK_DEFAULT_MIN_WRITE_SAMPLES),
36+
m_maxWriteSamples (BLOCK_DEFAULT_MAX_WRITE_SAMPLES)
3637
{
38+
// Ensure max >= min to avoid deadlock
39+
if (m_maxWriteSamples < m_minWriteSamples)
40+
{
41+
LOGC ("WARNING: MAX_WRITE_SAMPLES (", m_maxWriteSamples, ") < MIN_WRITE_SAMPLES (", m_minWriteSamples, "), setting MAX = MIN");
42+
m_maxWriteSamples = m_minWriteSamples;
43+
}
44+
45+
LOGD ("RecordThread initialized with MIN_WRITE_SAMPLES=", m_minWriteSamples, " MAX_WRITE_SAMPLES=", m_maxWriteSamples);
3746
}
3847

3948
RecordThread::~RecordThread()
@@ -137,15 +146,15 @@ void RecordThread::run()
137146
{
138147
int64 streamSampleNum = queue->getSampleNumberForBlock (0);
139148
m_perStreamSampleNumbers[streamIdx] = streamSampleNum;
140-
149+
141150
// Count channels in this stream and set sample numbers for all of them
142151
int numChannelsInStream = 0;
143152
for (int ch = 0; ch < m_numChannels; ch++)
144153
{
145154
if (m_timestampBufferChannelArray[ch] == streamIdx)
146155
numChannelsInStream++;
147156
}
148-
157+
149158
for (int i = 0; i < numChannelsInStream; i++)
150159
{
151160
if (globalChan < m_numChannels)
@@ -158,7 +167,7 @@ void RecordThread::run()
158167

159168
//3-Normal loop
160169
while (! threadShouldExit())
161-
writeData (BLOCK_MAX_WRITE_SAMPLES, BLOCK_MAX_WRITE_EVENTS, BLOCK_MAX_WRITE_SPIKES);
170+
writeData (m_minWriteSamples, m_maxWriteSamples, BLOCK_MAX_WRITE_EVENTS, BLOCK_MAX_WRITE_SPIKES);
162171

163172
//LOGD(__FUNCTION__, " Exiting record thread");
164173
//4-Before closing the thread, try to write the remaining samples
@@ -167,8 +176,8 @@ void RecordThread::run()
167176

168177
if (! closeEarly)
169178
{
170-
// flush the buffers
171-
writeData (BLOCK_MAX_WRITE_SAMPLES, BLOCK_MAX_WRITE_EVENTS, BLOCK_MAX_WRITE_SPIKES, true);
179+
// flush the buffers - use 0 for minSamples to write all remaining data
180+
writeData (0, m_maxWriteSamples, BLOCK_MAX_WRITE_EVENTS, BLOCK_MAX_WRITE_SPIKES, true);
172181

173182
//5-Close files
174183
m_engine->closeFiles();
@@ -179,14 +188,20 @@ void RecordThread::run()
179188
//LOGC("RecordThread received ", spikesReceived, " spikes and wrote ", spikesWritten, ".");
180189
}
181190

182-
void RecordThread::writeData (int maxSamples,
191+
void RecordThread::writeData (int minSamples,
192+
int maxSamples,
183193
int maxEvents,
184194
int maxSpikes,
185195
bool lastBlock)
186196
{
187197
int numStreams = m_dataQueues->size();
188198
int globalChannelOffset = 0;
189199

200+
// Use sample-based thresholds directly - they naturally scale with channel count:
201+
// High channels = large byte writes (efficient), Low channels = small byte writes (fine)
202+
int effectiveMinSamples = lastBlock ? 0 : minSamples;
203+
int effectiveMaxSamples = maxSamples;
204+
190205
// Process each stream independently - this avoids race conditions
191206
// between streams with different sample rates
192207
for (int streamIdx = 0; streamIdx < numStreams; streamIdx++)
@@ -207,12 +222,12 @@ void RecordThread::writeData (int maxSamples,
207222
}
208223

209224
m_perStreamDataIdxs[streamIdx].resize (numChannelsInStream);
210-
m_perStreamTimestampIdxs[streamIdx].resize (1); // One timestamp stream per queue
225+
m_perStreamTimestampIdxs[streamIdx].resize (1); // One timestamp stream per queue
211226

212227
// Get single sample number for this stream
213228
int64 streamSampleNumber = 0;
214229

215-
if (queue->startRead (m_perStreamDataIdxs[streamIdx], m_perStreamTimestampIdxs[streamIdx], streamSampleNumber, maxSamples))
230+
if (queue->startRead (m_perStreamDataIdxs[streamIdx], m_perStreamTimestampIdxs[streamIdx], streamSampleNumber, effectiveMinSamples, effectiveMaxSamples))
216231
{
217232
// Update global sample numbers using the stream's sample number
218233
m_perStreamSampleNumbers[streamIdx] = streamSampleNumber;

Source/Processors/RecordNode/RecordThread.h

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,17 @@
2929
#include "DataQueue.h"
3030
#include "EventQueue.h"
3131
#include <atomic>
32+
#include <chrono>
33+
#include <cstdlib>
3234

33-
#define BLOCK_MAX_WRITE_SAMPLES 4096
35+
// Block write size limits (in samples) - tune these for performance
36+
// Sample-based thresholds naturally scale with channel count:
37+
// - High channel count = large writes (efficient for high data rates)
38+
// - Low channel count = small writes (fine for low data rates, keeps latency low)
39+
//
40+
41+
#define BLOCK_DEFAULT_MIN_WRITE_SAMPLES 8192
42+
#define BLOCK_DEFAULT_MAX_WRITE_SAMPLES 16384
3443
#define BLOCK_MAX_WRITE_EVENTS 50000
3544
#define BLOCK_MAX_WRITE_SPIKES 50000
3645

@@ -79,8 +88,15 @@ class RecordThread : public Thread
7988
RecordNode* recordNode;
8089

8190
private:
82-
/** Writes continuous data from all per-stream queues */
83-
void writeData (int maxSamples,
91+
/** Writes continuous data from all per-stream queues
92+
* @param minSamples Minimum samples required before writing (0 = write any available)
93+
* @param maxSamples Maximum samples per write batch
94+
* @param maxEvents Maximum events to write
95+
* @param maxSpikes Maximum spikes to write
96+
* @param lastBlock If true, write all remaining data regardless of minSamples
97+
*/
98+
void writeData (int minSamples,
99+
int maxSamples,
84100
int maxEvents,
85101
int maxSpikes,
86102
bool lastBlock = false);
@@ -89,16 +105,16 @@ class RecordThread : public Thread
89105
Array<int> m_channelArray;
90106
Array<int> m_timestampBufferChannelArray;
91107

92-
OwnedArray<DataQueue>* m_dataQueues; // Array of per-stream queues
108+
OwnedArray<DataQueue>* m_dataQueues; // Array of per-stream queues
93109
EventMsgQueue* m_eventQueue;
94110
SpikeMsgQueue* m_spikeQueue;
95111

96112
std::atomic<bool> m_receivedFirstBlock;
97113
std::atomic<bool> m_cleanExit;
98114

99-
Array<int64> sampleNumbers; // Global sample numbers (indexed by global channel)
100-
std::vector<int64> m_perStreamSampleNumbers; // Per-stream sample numbers
101-
115+
Array<int64> sampleNumbers; // Global sample numbers (indexed by global channel)
116+
std::vector<int64> m_perStreamSampleNumbers; // Per-stream sample numbers
117+
102118
// Per-stream buffer index arrays for independent queue reading
103119
std::vector<std::vector<CircularBufferIndexes>> m_perStreamDataIdxs;
104120
std::vector<std::vector<CircularBufferIndexes>> m_perStreamTimestampIdxs;
@@ -107,15 +123,19 @@ class RecordThread : public Thread
107123
int spikesWritten;
108124

109125
// Batch write support - pre-allocated buffers for grouping channels by stream
110-
std::vector<int> m_batchWriteChannels; // Write channel indices for current batch
111-
std::vector<int> m_batchRealChannels; // Real channel indices for current batch
112-
std::vector<const float*> m_batchDataPtrs; // Data buffer pointers for current batch
126+
std::vector<int> m_batchWriteChannels; // Write channel indices for current batch
127+
std::vector<int> m_batchRealChannels; // Real channel indices for current batch
128+
std::vector<const float*> m_batchDataPtrs; // Data buffer pointers for current batch
113129

114130
File m_rootFolder;
115131
int m_experimentNumber;
116132
int m_recordingNumber;
117133
int m_numChannels;
118134

135+
// Block size limits (in samples)
136+
int m_minWriteSamples; // Minimum samples before writing
137+
int m_maxWriteSamples; // Maximum samples per write batch
138+
119139
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (RecordThread);
120140
};
121141

0 commit comments

Comments
 (0)