Skip to content

Commit f774629

Browse files
committed
Add writeAllChannels method to DataQueue
1 parent 9362b94 commit f774629

4 files changed

Lines changed: 119 additions & 19 deletions

File tree

Source/Processors/RecordNode/DataQueue.cpp

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,16 +158,22 @@ float DataQueue::writeSynchronizedTimestamps (double start, double step, int des
158158
LOGE (__FUNCTION__, " Recording Data Queue Overflow: sz1: ", size1, " sz2: ", size2, " nSamples: ", nSamples);
159159
}
160160

161+
// Get direct pointer access for faster writes
162+
double* writePtr = m_FTSBuffer.getWritePointer (destChannel);
163+
164+
// Fill first segment
161165
for (int i = 0; i < size1; i++)
162166
{
163-
m_FTSBuffer.setSample (destChannel, index1 + i, start + (double) i * step);
167+
writePtr[index1 + i] = start + (double) i * step;
164168
}
165169

170+
// Fill second segment (wrap-around) if present
166171
if (size2 > 0)
167172
{
173+
double offset = start + (double) size1 * step;
168174
for (int i = 0; i < size2; i++)
169175
{
170-
m_FTSBuffer.setSample (destChannel, index2 + i, start + (double) (size1 * step) + double (i * step));
176+
writePtr[index2 + i] = offset + (double) i * step;
171177
}
172178
}
173179

@@ -214,8 +220,69 @@ float DataQueue::writeChannel (const AudioBuffer<float>& buffer,
214220
return 1.0f - (float) m_fifos[destChannel]->getFreeSpace() / (float) m_fifos[destChannel]->getTotalSize();
215221
}
216222

223+
float DataQueue::writeAllChannels (const AudioBuffer<float>& buffer,
224+
const Array<int>& srcChannels,
225+
int nSamples,
226+
int64 sampleNumber)
227+
{
228+
if (m_numChans == 0 || nSamples == 0)
229+
return 0.0f;
230+
231+
// Get FIFO indices from first channel - all channels should be in sync
232+
int index1, size1, index2, size2;
233+
m_fifos[0]->prepareToWrite (nSamples, index1, size1, index2, size2);
234+
235+
if ((size1 + size2) < nSamples)
236+
{
237+
LOGE (__FUNCTION__, " Recording Data Queue Overflow: sz1: ", size1, " sz2: ", size2, " nSamples: ", nSamples);
238+
}
239+
240+
float maxUsage = 0.0f;
241+
242+
// Fill sample numbers once for channel 0
243+
fillSampleNumbers (0, index1, size1, sampleNumber);
244+
if (size2 > 0)
245+
{
246+
fillSampleNumbers (0, index2, size2, sampleNumber + size1);
247+
}
248+
249+
// Batch copy and update all channels
250+
for (int destChannel = 0; destChannel < m_numChans; ++destChannel)
251+
{
252+
int srcChannel = srcChannels[destChannel];
253+
254+
// Copy first segment
255+
m_buffer.copyFrom (destChannel, index1, buffer, srcChannel, 0, size1);
256+
257+
// Copy second segment (wrap-around) if present
258+
if (size2 > 0)
259+
{
260+
m_buffer.copyFrom (destChannel, index2, buffer, srcChannel, size1, size2);
261+
}
262+
263+
// Copy sample numbers from channel 0 (faster than calling fillSampleNumbers for each)
264+
if (destChannel > 0)
265+
{
266+
std::memcpy (m_sampleNumbers[destChannel]->data(),
267+
m_sampleNumbers[0]->data(),
268+
m_numBlocks * sizeof (int64));
269+
}
270+
271+
// Update FIFO state - need to call prepareToWrite for channels > 0
272+
if (destChannel > 0)
273+
{
274+
int d1, d2, d3, d4;
275+
m_fifos[destChannel]->prepareToWrite (nSamples, d1, d2, d3, d4);
276+
}
277+
m_fifos[destChannel]->finishedWrite (size1 + size2);
278+
}
279+
280+
// Return usage from last channel (all should be the same)
281+
return 1.0f - (float) m_fifos[m_numChans - 1]->getFreeSpace() / (float) m_fifos[m_numChans - 1]->getTotalSize();
282+
}
283+
217284
/*
218-
We could copy the internal circular buffer to an external one, as DataBuffer does. This class
285+
We could copy the internal circular buffer to an external one, as DataBuffer does.
219286
is, however, intended for disk writing, which is one of the most CPU-critical systems. Just
220287
allowing the record subsystem to access the internal buffer is way faster, although it has to be
221288
done with special care and manually finish the read process.

Source/Processors/RecordNode/DataQueue.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,22 @@ class DataQueue
6969
/** Writes an array of data for one channel */
7070
float writeChannel (const AudioBuffer<float>& buffer, int srcChannel, int destChannel, int nSamples, int64 sampleNumber);
7171

72+
/**
73+
* Batch write: writes data for all channels in a single operation.
74+
* Much faster than calling writeChannel() repeatedly because it uses
75+
* a single FIFO operation for all channels.
76+
*
77+
* @param buffer Source audio buffer
78+
* @param srcChannels Array mapping dest channel index to source channel index
79+
* @param nSamples Number of samples to write per channel
80+
* @param sampleNumber Starting sample number for this block
81+
* @return Maximum FIFO usage across all channels (0.0 to 1.0)
82+
*/
83+
float writeAllChannels (const AudioBuffer<float>& buffer,
84+
const Array<int>& srcChannels,
85+
int nSamples,
86+
int64 sampleNumber);
87+
7288
/** Writes an array of timestamps for one stream */
7389
float writeSynchronizedTimestamps (double start, double step, int destChannel, int nSamples);
7490

Source/Processors/RecordNode/RecordNode.cpp

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,7 @@ bool RecordNode::stopAcquisition()
791791
// called by GenericProcessor::setRecording() and CoreServices::setRecordingStatus()
792792
void RecordNode::startRecording()
793793
{
794+
794795
Array<int> chanProcessorMap;
795796
Array<int> chanOrderinProc;
796797
OwnedArray<RecordProcessorInfo> procInfo;
@@ -867,6 +868,9 @@ void RecordNode::startRecording()
867868
int bufferSize = ads.bufferSize;
868869

869870
dataQueues.clear();
871+
streamSourceChannels.clear();
872+
int globalChannelIdx = 0;
873+
870874
for (int i = 0; i < dataStreams.size(); i++)
871875
{
872876
// Only create queue if this stream has recorded channels
@@ -876,12 +880,24 @@ void RecordNode::startRecording()
876880
queue->setChannelCount (recordedChannelsPerStream[i]);
877881
queue->setTimestampStreamCount (1); // Each queue has one timestamp stream
878882
dataQueues.add (queue);
883+
884+
// Pre-compute source channel array for this stream
885+
Array<int>* srcChannels = new Array<int>();
886+
srcChannels->ensureStorageAllocated (recordedChannelsPerStream[i]);
887+
for (int j = 0; j < recordedChannelsPerStream[i]; j++)
888+
{
889+
srcChannels->add (channelMap[globalChannelIdx + j]);
890+
}
891+
streamSourceChannels.add (srcChannels);
879892
}
880893
else
881894
{
882895
// Add nullptr placeholder to maintain stream index alignment
883896
dataQueues.add (nullptr);
897+
streamSourceChannels.add (nullptr);
884898
}
899+
900+
globalChannelIdx += recordedChannelsPerStream[i];
885901
}
886902

887903
recordThread->setQueuePointers (&dataQueues, eventQueue.get(), spikeQueue.get());
@@ -1099,7 +1115,10 @@ void RecordNode::process (AudioBuffer<float>& buffer)
10991115
{
11001116
streamIndex++;
11011117

1102-
int recordChanCount = (*stream)["channels"].getArray()->size();
1118+
// Use pre-computed array size to avoid JSON lookup on every call
1119+
int recordChanCount = (streamSourceChannels[streamIndex] != nullptr)
1120+
? streamSourceChannels[streamIndex]->size()
1121+
: 0;
11031122

11041123
if (recordChanCount == 0)
11051124
continue;
@@ -1111,8 +1130,6 @@ void RecordNode::process (AudioBuffer<float>& buffer)
11111130

11121131
int64 sampleNumber = getFirstSampleNumberForBlock (streamId);
11131132

1114-
float totalFifoUsage = 0.0f;
1115-
11161133
double first, second;
11171134

11181135
// Skip if no queue for this stream (no recorded channels)
@@ -1142,22 +1159,21 @@ void RecordNode::process (AudioBuffer<float>& buffer)
11421159
numSamples);
11431160
}
11441161

1145-
for (int i = 0; i < recordChanCount; i++)
1162+
if (numSamples > 0 && recordChanCount > 0 && streamSourceChannels[streamIndex] != nullptr)
11461163
{
1147-
channelIndex++;
1148-
1149-
if (numSamples > 0)
1150-
{
1151-
// Write to per-stream queue with local channel index (i)
1152-
totalFifoUsage += dataQueues[streamIndex]->writeChannel (buffer,
1153-
channelMap[channelIndex],
1154-
i, // local channel index within stream
1155-
numSamples,
1156-
sampleNumber);
1157-
}
1164+
// Batch write all channels at once using pre-computed source channels
1165+
fifoUsage[streamId] = dataQueues[streamIndex]->writeAllChannels (
1166+
buffer,
1167+
*streamSourceChannels[streamIndex],
1168+
numSamples,
1169+
sampleNumber);
1170+
}
1171+
else
1172+
{
1173+
fifoUsage[streamId] = 0.0f;
11581174
}
11591175

1160-
fifoUsage[streamId] = totalFifoUsage / recordChanCount;
1176+
channelIndex += recordChanCount;
11611177

11621178
if (fifoUsage[streamId] > 0.9)
11631179
fifoAlmostFull = true;

Source/Processors/RecordNode/RecordNode.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ class TESTABLE RecordNode : public GenericProcessor,
293293
int recordingNumber;
294294

295295
OwnedArray<DataQueue> dataQueues; // One DataQueue per stream for race-condition-free reading
296+
OwnedArray<Array<int>> streamSourceChannels; // Pre-computed source channel indices per stream
296297
std::unique_ptr<EventMsgQueue> eventQueue;
297298
std::unique_ptr<SpikeMsgQueue> spikeQueue;
298299

0 commit comments

Comments
 (0)