Skip to content

Commit e9c618e

Browse files
committed
Optimize DataQueue and RecordThread for per-stream sample number handling
1 parent 99f98ed commit e9c618e

5 files changed

Lines changed: 90 additions & 82 deletions

File tree

Source/Processors/RecordNode/DataQueue.cpp

Lines changed: 55 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,20 @@ void DataQueue::setChannelCount (int nChans)
6767
m_readSamples.clear();
6868
m_numChans = nChans;
6969
m_sampleNumbers.clear();
70-
m_lastReadSampleNumbers.clear();
70+
m_lastReadSampleNumber = 0;
7171

7272
for (int i = 0; i < nChans; ++i)
7373
{
7474
m_fifos.add (new AbstractFifo (m_maxSize));
7575
m_readSamples.push_back (0);
76-
m_sampleNumbers.add (new std::vector<int64>());
76+
}
7777

78-
for (int j = 0; j < m_numBlocks; j++)
79-
{
80-
m_sampleNumbers.getLast()->push_back (0);
81-
}
82-
m_lastReadSampleNumbers.push_back (0);
78+
// Initialize per-stream sample numbers (one per block)
79+
for (int j = 0; j < m_numBlocks; j++)
80+
{
81+
m_sampleNumbers.push_back (0);
8382
}
83+
8484
m_buffer.setSize (nChans, m_maxSize);
8585
}
8686

@@ -98,10 +98,12 @@ void DataQueue::resize (int nBlocks)
9898
m_fifos[i]->setTotalSize (size);
9999
m_fifos[i]->reset();
100100
m_readSamples[i] = 0;
101-
m_sampleNumbers[i]->resize (nBlocks);
102-
m_lastReadSampleNumbers[i] = 0;
103101
}
104102

103+
// Resize per-stream sample numbers
104+
m_sampleNumbers.resize (nBlocks);
105+
m_lastReadSampleNumber = 0;
106+
105107
m_readFTSSamples.clear();
106108

107109
for (int i = 0; i < m_numFTSChans; ++i)
@@ -114,7 +116,7 @@ void DataQueue::resize (int nBlocks)
114116
m_FTSBuffer.setSize (m_numFTSChans, size);
115117
}
116118

117-
void DataQueue::fillSampleNumbers (int channel, int index, int size, int64 sampleNumber)
119+
void DataQueue::fillSampleNumbers (int index, int size, int64 sampleNumber)
118120
{
119121
//Search for the next block start.
120122
int blockMod = index % m_blockSize;
@@ -142,7 +144,7 @@ void DataQueue::fillSampleNumbers (int channel, int index, int size, int64 sampl
142144
if ((blockStartPos + i) < (index + size))
143145
{
144146
latestSampleNumber = startSampleNumber + (i * m_blockSize);
145-
m_sampleNumbers[channel]->at (blockIdx) = latestSampleNumber;
147+
m_sampleNumbers[blockIdx] = latestSampleNumber;
146148
}
147149
}
148150
}
@@ -202,7 +204,11 @@ float DataQueue::writeChannel (const AudioBuffer<float>& buffer,
202204
0,
203205
size1);
204206

205-
fillSampleNumbers (destChannel, index1, size1, sampleNumber);
207+
// Only fill sample numbers once per stream (using first channel write)
208+
if (destChannel == 0)
209+
{
210+
fillSampleNumbers (index1, size1, sampleNumber);
211+
}
206212

207213
if (size2 > 0)
208214
{
@@ -213,7 +219,10 @@ float DataQueue::writeChannel (const AudioBuffer<float>& buffer,
213219
size1,
214220
size2);
215221

216-
fillSampleNumbers (destChannel, index2, size2, sampleNumber + size1);
222+
if (destChannel == 0)
223+
{
224+
fillSampleNumbers (index2, size2, sampleNumber + size1);
225+
}
217226
}
218227
m_fifos[destChannel]->finishedWrite (size1 + size2);
219228

@@ -239,11 +248,11 @@ float DataQueue::writeAllChannels (const AudioBuffer<float>& buffer,
239248

240249
float maxUsage = 0.0f;
241250

242-
// Fill sample numbers once for channel 0
243-
fillSampleNumbers (0, index1, size1, sampleNumber);
251+
// Fill sample numbers once for the stream (not per-channel)
252+
fillSampleNumbers (index1, size1, sampleNumber);
244253
if (size2 > 0)
245254
{
246-
fillSampleNumbers (0, index2, size2, sampleNumber + size1);
255+
fillSampleNumbers (index2, size2, sampleNumber + size1);
247256
}
248257

249258
// Batch copy and update all channels
@@ -260,14 +269,6 @@ float DataQueue::writeAllChannels (const AudioBuffer<float>& buffer,
260269
m_buffer.copyFrom (destChannel, index2, buffer, srcChannel, size1, size2);
261270
}
262271

263-
// Copy sample numbers from channel 0 (faster than calling fillSampleNumbers for each)
264-
if (destChannel > 0)
265-
{
266-
std::memcpy (m_sampleNumbers[destChannel]->data(),
267-
m_sampleNumbers[0]->data(),
268-
m_numBlocks * sizeof (int64));
269-
}
270-
271272
// Update FIFO state - need to call prepareToWrite for channels > 0
272273
if (destChannel > 0)
273274
{
@@ -278,7 +279,8 @@ float DataQueue::writeAllChannels (const AudioBuffer<float>& buffer,
278279
}
279280

280281
// Return usage from last channel (all should be the same)
281-
return 1.0f - (float) m_fifos[m_numChans - 1]->getFreeSpace() / (float) m_fifos[m_numChans - 1]->getTotalSize();
282+
const float usage = 1.0f - (float) m_fifos[m_numChans - 1]->getFreeSpace() / (float) m_fifos[m_numChans - 1]->getTotalSize();
283+
return usage;
282284
}
283285

284286
/*
@@ -300,7 +302,7 @@ const SynchronizedTimestampBuffer& DataQueue::getTimestampBufferReference() cons
300302

301303
bool DataQueue::startRead (std::vector<CircularBufferIndexes>& dataBufferIdxs,
302304
std::vector<CircularBufferIndexes>& timestampBufferIdxs,
303-
Array<int64>& sampleNumbers,
305+
int64& sampleNumber,
304306
int nMax)
305307
{
306308
//This should never happen, but it never hurts to be on the safe side.
@@ -344,37 +346,39 @@ bool DataQueue::startRead (std::vector<CircularBufferIndexes>& dataBufferIdxs,
344346
idx.size2 = 0;
345347
m_readFTSSamples[chan] = 0;
346348
}
349+
sampleNumber = m_lastReadSampleNumber;
347350
m_readInProgress = false;
348351
return false;
349352
}
350353

351-
// Second pass: read the same number of samples from all channels
352-
for (int chan = 0; chan < m_numChans; ++chan)
353-
{
354-
CircularBufferIndexes& idx = dataBufferIdxs[chan];
354+
// Get sample number for the stream (using first channel's indices)
355+
CircularBufferIndexes& firstIdx = dataBufferIdxs[0];
356+
m_fifos.getUnchecked (0)->prepareToRead (samplesToRead, firstIdx.index1, firstIdx.size1, firstIdx.index2, firstIdx.size2);
357+
m_readSamples[0] = firstIdx.size1 + firstIdx.size2;
355358

356-
m_fifos.getUnchecked (chan)->prepareToRead (samplesToRead, idx.index1, idx.size1, idx.index2, idx.size2);
357-
m_readSamples[chan] = idx.size1 + idx.size2;
359+
int blockMod = firstIdx.index1 % m_blockSize;
360+
int blockDiff = (blockMod == 0) ? 0 : (m_blockSize - blockMod);
358361

359-
int blockMod = idx.index1 % m_blockSize;
360-
int blockDiff = (blockMod == 0) ? 0 : (m_blockSize - blockMod);
362+
// If the next sample number block is within the data we're reading, include the translated sample number
363+
if (blockDiff < (firstIdx.size1 + firstIdx.size2))
364+
{
365+
int blockIdx = ((firstIdx.index1 + blockDiff) / m_blockSize) % m_numBlocks;
366+
sampleNumber = m_sampleNumbers[blockIdx] - blockDiff;
367+
}
368+
else
369+
{
370+
// If not, use the last sent sample number
371+
sampleNumber = m_lastReadSampleNumber;
372+
}
361373

362-
//If the next sample number block is within the data we're reading, include the translated sample number in the output
363-
int64 sampleNum;
374+
m_lastReadSampleNumber = sampleNumber + firstIdx.size1 + firstIdx.size2;
364375

365-
if (blockDiff < (idx.size1 + idx.size2))
366-
{
367-
int blockIdx = ((idx.index1 + blockDiff) / m_blockSize) % m_numBlocks;
368-
sampleNum = m_sampleNumbers[chan]->at (blockIdx) - blockDiff;
369-
}
370-
//If not, copy the last sent again
371-
else
372-
{
373-
sampleNum = m_lastReadSampleNumbers[chan];
374-
}
375-
376-
sampleNumbers.set (chan, sampleNum); // expensive operation?
377-
m_lastReadSampleNumbers[chan] = sampleNum + idx.size1 + idx.size2;
376+
// Read remaining channels with same parameters
377+
for (int chan = 1; chan < m_numChans; ++chan)
378+
{
379+
CircularBufferIndexes& idx = dataBufferIdxs[chan];
380+
m_fifos.getUnchecked (chan)->prepareToRead (samplesToRead, idx.index1, idx.size1, idx.index2, idx.size2);
381+
m_readSamples[chan] = idx.size1 + idx.size2;
378382
}
379383

380384
// Also find minimum for timestamp streams and read consistently
@@ -418,11 +422,7 @@ void DataQueue::stopRead()
418422
m_readInProgress = false;
419423
}
420424

421-
void DataQueue::getSampleNumbersForBlock (int idx, Array<int64>& sampleNumbers) const
425+
int64 DataQueue::getSampleNumberForBlock (int idx) const
422426
{
423-
sampleNumbers.clear();
424-
for (int chan = 0; chan < m_numChans; ++chan)
425-
{
426-
sampleNumbers.add ((*m_sampleNumbers[chan])[idx]);
427-
}
427+
return m_sampleNumbers[idx];
428428
}

Source/Processors/RecordNode/DataQueue.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class DataQueue
5252
~DataQueue();
5353

5454
/// ----------- NOT THREAD SAFE -------------- //
55-
/** Sets the number of continuous channel buffers needed */
55+
/** Sets the number of continuous channel buffers needed (all channels belong to one stream) */
5656
void setChannelCount (int nChans);
5757

5858
/** Sets the number of timestamp buffers needed */
@@ -61,8 +61,8 @@ class DataQueue
6161
/** Changes the number of blocks in the queue */
6262
void resize (int nBlocks);
6363

64-
/** Returns an array of sample numbers for a given block*/
65-
void getSampleNumbersForBlock (int idx, Array<int64>& sampleNumbers) const;
64+
/** Returns the sample number for a given block (same for all channels in stream) */
65+
int64 getSampleNumberForBlock (int idx) const;
6666

6767
/// ----------- THREAD SAFE -------------- //
6868

@@ -88,10 +88,10 @@ class DataQueue
8888
/** Writes an array of timestamps for one stream */
8989
float writeSynchronizedTimestamps (double start, double step, int destChannel, int nSamples);
9090

91-
/** Start reading data for one channel */
91+
/** Start reading data for all channels in this stream */
9292
bool startRead (std::vector<CircularBufferIndexes>& dataBufferIdxs,
9393
std::vector<CircularBufferIndexes>& timestampBufferIdxs,
94-
Array<int64>& sampleNumbers,
94+
int64& sampleNumber,
9595
int nMax);
9696

9797
/** Called when data read is finished */
@@ -107,8 +107,8 @@ class DataQueue
107107
int getBlockSize();
108108

109109
private:
110-
/** Fills the sample number buffer for a given channel */
111-
void fillSampleNumbers (int channel, int index, int size, int64 sampleNumber);
110+
/** Fills the sample number buffer for the stream */
111+
void fillSampleNumbers (int index, int size, int64 sampleNumber);
112112

113113
int lastIdx;
114114

@@ -120,8 +120,8 @@ class DataQueue
120120

121121
std::vector<int> m_readSamples;
122122
std::vector<int> m_readFTSSamples;
123-
OwnedArray<std::vector<int64>> m_sampleNumbers;
124-
std::vector<int64> m_lastReadSampleNumbers;
123+
std::vector<int64> m_sampleNumbers; // Per-stream sample numbers (one per block)
124+
int64 m_lastReadSampleNumber; // Last sample number read for the stream
125125

126126
int m_numChans;
127127
int m_numFTSChans;

Source/Processors/RecordNode/RecordNode.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ void RecordNode::startRecording()
929929
}
930930

931931
recordThread->setFileComponents (rootFolder, experimentNumber, recordingNumber);
932-
recordThread->startThread();
932+
recordThread->startThread(Thread::Priority::highest);
933933
isRecording = true;
934934

935935
if (settingsNeeded)

Source/Processors/RecordNode/RecordThread.cpp

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ void RecordThread::run()
103103
int numStreams = recordNode->getNumDataStreams();
104104
m_perStreamDataIdxs.resize (numStreams);
105105
m_perStreamTimestampIdxs.resize (numStreams);
106+
m_perStreamSampleNumbers.resize (numStreams, 0);
106107

107108
bool closeEarly = true;
108109

@@ -131,12 +132,21 @@ void RecordThread::run()
131132
DataQueue* queue = (*m_dataQueues)[streamIdx];
132133
if (queue != nullptr)
133134
{
134-
Array<int64> streamSampleNums;
135-
queue->getSampleNumbersForBlock (0, streamSampleNums);
136-
for (int i = 0; i < streamSampleNums.size(); i++)
135+
int64 streamSampleNum = queue->getSampleNumberForBlock (0);
136+
m_perStreamSampleNumbers[streamIdx] = streamSampleNum;
137+
138+
// Count channels in this stream and set sample numbers for all of them
139+
int numChannelsInStream = 0;
140+
for (int ch = 0; ch < m_numChannels; ch++)
141+
{
142+
if (m_timestampBufferChannelArray[ch] == streamIdx)
143+
numChannelsInStream++;
144+
}
145+
146+
for (int i = 0; i < numChannelsInStream; i++)
137147
{
138148
if (globalChan < m_numChannels)
139-
initSampleNumbers.add (streamSampleNums[i]);
149+
initSampleNumbers.add (streamSampleNum);
140150
globalChan++;
141151
}
142152
}
@@ -196,21 +206,18 @@ void RecordThread::writeData (int maxSamples,
196206
m_perStreamDataIdxs[streamIdx].resize (numChannelsInStream);
197207
m_perStreamTimestampIdxs[streamIdx].resize (1); // One timestamp stream per queue
198208

199-
// Create local sample numbers array for this stream
200-
Array<int64> streamSampleNumbers;
201-
for (int i = 0; i < numChannelsInStream; i++)
202-
streamSampleNumbers.add (0);
209+
// Get single sample number for this stream
210+
int64 streamSampleNumber = 0;
203211

204-
if (queue->startRead (m_perStreamDataIdxs[streamIdx], m_perStreamTimestampIdxs[streamIdx], streamSampleNumbers, maxSamples))
212+
if (queue->startRead (m_perStreamDataIdxs[streamIdx], m_perStreamTimestampIdxs[streamIdx], streamSampleNumber, maxSamples))
205213
{
206-
// Update global sample numbers
207-
int localChan = 0;
214+
// Update global sample numbers using the stream's sample number
215+
m_perStreamSampleNumbers[streamIdx] = streamSampleNumber;
208216
for (int globalChan = 0; globalChan < m_numChannels; globalChan++)
209217
{
210218
if (m_timestampBufferChannelArray[globalChan] == streamIdx)
211219
{
212-
sampleNumbers.set (globalChan, streamSampleNumbers[localChan]);
213-
localChan++;
220+
sampleNumbers.set (globalChan, streamSampleNumber);
214221
}
215222
}
216223
m_engine->updateLatestSampleNumbers (sampleNumbers);
@@ -271,14 +278,14 @@ void RecordThread::writeData (int maxSamples,
271278
int numSamples2 = m_perStreamDataIdxs[streamIdx][0].size2;
272279
int bufferIndex2 = m_perStreamDataIdxs[streamIdx][0].index2;
273280

274-
// Update sample numbers
275-
localChan = 0;
281+
// Update sample numbers using per-stream value
282+
int64 updatedStreamSampleNumber = m_perStreamSampleNumbers[streamIdx] + numSamples;
283+
m_perStreamSampleNumbers[streamIdx] = updatedStreamSampleNumber;
276284
for (int globalChan = 0; globalChan < m_numChannels; globalChan++)
277285
{
278286
if (m_timestampBufferChannelArray[globalChan] == streamIdx)
279287
{
280-
sampleNumbers.set (globalChan, sampleNumbers[globalChan] + numSamples);
281-
localChan++;
288+
sampleNumbers.set (globalChan, updatedStreamSampleNumber);
282289
}
283290
}
284291
m_engine->updateLatestSampleNumbers (sampleNumbers, m_batchWriteChannels[0]);

Source/Processors/RecordNode/RecordThread.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ class RecordThread : public Thread
9797
std::atomic<bool> m_cleanExit;
9898

9999
Array<int64> sampleNumbers; // Global sample numbers (indexed by global channel)
100+
std::vector<int64> m_perStreamSampleNumbers; // Per-stream sample numbers
100101

101102
// Per-stream buffer index arrays for independent queue reading
102103
std::vector<std::vector<CircularBufferIndexes>> m_perStreamDataIdxs;

0 commit comments

Comments
 (0)