Skip to content

Commit b543af9

Browse files
committed
Implement per-stream DataQueues
1 parent bd7feb8 commit b543af9

12 files changed

Lines changed: 352 additions & 149 deletions

Source/Processors/RecordNode/BinaryFormat/BinaryRecording.cpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -685,9 +685,8 @@ void BinaryRecording::writeContinuousData (int writeChannel,
685685
wroteFirstSampleNumber[streamId] = true;
686686
}
687687

688-
for (int i = 0; i < size; i++)
689-
/* Generate int sample number */
690-
m_sampleNumberBuffer[i] = baseSampleNumber + i;
688+
/* Generate sequential sample numbers using SIMD-optimized fill */
689+
SIMDConverter::fillSequentialInt64 (m_sampleNumberBuffer, baseSampleNumber, size);
691690

692691
/* Write int timestamps to disc */
693692
m_dataTimestampFiles[fileIndex]->writeData (m_sampleNumberBuffer, size * sizeof (int64));
@@ -764,9 +763,10 @@ void BinaryRecording::writeSpike (int electrodeIndex, const Spike* spike)
764763
m_intBuffer.malloc (totalSamples);
765764
}
766765

767-
double multFactor = 1 / (float (0x7fff) * channel->getChannelBitVolts (0));
768-
FloatVectorOperations::copyWithMultiply (m_scaledBuffer.getData(), spike->getDataPointer(), multFactor, totalSamples);
769-
AudioDataConverters::convertFloatToInt16LE (m_scaledBuffer.getData(), m_intBuffer.getData(), totalSamples);
766+
/* Convert spike waveforms from float to int16 using SIMD-optimized conversion.
767+
Scale factor converts microvolts to int16 units: output = input / bitVolts */
768+
float scaleFactor = 1.0f / channel->getChannelBitVolts (0);
769+
SIMDConverter::convertFloatToInt16 (spike->getDataPointer(), m_intBuffer.getData(), scaleFactor, totalSamples);
770770
rec->data->writeData (m_intBuffer.getData(), totalSamples * sizeof (int16));
771771

772772
int64 sampleIdx = spike->getSampleNumber();
@@ -848,6 +848,8 @@ void BinaryRecording::writeContinuousDataBatch (const int* writeChannels,
848848
// Get starting sample position (all channels in a stream have same position)
849849
uint64 startPos = m_samplesWritten[writeChannels[0]];
850850

851+
//LOGD("BinaryRecording::writeContinuousDataBatch: Writing ", numSamples, " samples for ", numChannels, " channels at position ", startPos, " to file index ", fileIndex);
852+
851853
// Try batch interleaving if we have all channels for this file
852854
// The file's channel count is determined by the stream's channel count
853855
// If we have a partial batch, fall back to per-channel writes
@@ -894,8 +896,8 @@ void BinaryRecording::writeContinuousDataBatch (const int* writeChannels,
894896
wroteFirstSampleNumber[streamId] = true;
895897
}
896898

897-
for (int s = 0; s < numSamples; s++)
898-
m_sampleNumberBuffer[s] = baseSampleNumber + s;
899+
/* Generate sequential sample numbers using SIMD-optimized fill */
900+
SIMDConverter::fillSequentialInt64 (m_sampleNumberBuffer, baseSampleNumber, numSamples);
899901

900902
m_dataTimestampFiles[fileIndex]->writeData (m_sampleNumberBuffer, numSamples * sizeof (int64));
901903
m_dataTimestampFiles[fileIndex]->increaseRecordCount (numSamples);
@@ -921,8 +923,7 @@ void BinaryRecording::writeTimestampSyncText (uint64 streamId, int64 sampleNumbe
921923
jassert (fsn == sampleNumber);
922924

923925
m_syncTextFile->writeText (syncString + "\r\n", false, false, nullptr);
924-
925-
m_syncTextFile->flush();
926+
// Note: flush removed - file will be flushed on close or by OS buffering
926927
}
927928

928929
RecordEngineManager* BinaryRecording::getEngineManager()

Source/Processors/RecordNode/BinaryFormat/NpyFile.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ bool NpyFile::openFile (String path)
7676
LOGD ("Re-creating file: ", path);
7777
}
7878

79-
m_file = file.createOutputStream();
79+
// Use 64KB buffer to reduce system calls
80+
m_file = file.createOutputStream (65536);
8081

8182
if (! m_file)
8283
return false;

Source/Processors/RecordNode/BinaryFormat/NpyFile.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class PLUGIN_API NpyFile
124124
unsigned int m_dim2;
125125

126126
/** flush file buffer to disk and update the .npy header every this many records: */
127-
const int recordBufferSize { 1024 };
127+
const int recordBufferSize { 32768 };
128128
};
129129

130130
#endif

Source/Processors/RecordNode/BinaryFormat/SIMDConverter.cpp

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,3 +605,71 @@ void SIMDConverter::interleaveInt16 (const int16_t* const* channelData,
605605
return;
606606
}
607607
}
608+
609+
// ============================================================================
610+
// Sequential int64 fill (for sample numbers)
611+
// ============================================================================
612+
613+
void SIMDConverter::fillSequentialInt64 (int64_t* output, int64_t baseValue, int numSamples)
614+
{
615+
if (numSamples <= 0)
616+
return;
617+
618+
#if defined(__ARM_NEON) || defined(__ARM_NEON__)
619+
// ARM NEON: Process 2 int64 values at a time
620+
const int simdWidth = 2;
621+
int i = 0;
622+
623+
// Initialize increment vector [0, 1] + baseValue -> [base, base+1]
624+
int64x2_t vbase = { baseValue, baseValue + 1 };
625+
const int64x2_t vIncrement = { 2, 2 };
626+
627+
const int numFullIterations = numSamples / simdWidth;
628+
629+
for (int iter = 0; iter < numFullIterations; ++iter)
630+
{
631+
vst1q_s64 (reinterpret_cast<int64_t*> (output + i), vbase);
632+
vbase = vaddq_s64 (vbase, vIncrement);
633+
i += simdWidth;
634+
}
635+
636+
// Handle remaining samples
637+
int64_t currentValue = baseValue + i;
638+
for (; i < numSamples; ++i)
639+
{
640+
output[i] = currentValue++;
641+
}
642+
643+
#elif defined(__SSE2__) || defined(_M_X64) || defined(_M_IX86)
644+
// x86 SSE2: Process 2 int64 values at a time
645+
const int simdWidth = 2;
646+
int i = 0;
647+
648+
// Initialize: [base, base+1]
649+
__m128i vbase = _mm_set_epi64x (baseValue + 1, baseValue);
650+
const __m128i vIncrement = _mm_set_epi64x (2, 2);
651+
652+
const int numFullIterations = numSamples / simdWidth;
653+
654+
for (int iter = 0; iter < numFullIterations; ++iter)
655+
{
656+
_mm_storeu_si128 (reinterpret_cast<__m128i*> (output + i), vbase);
657+
vbase = _mm_add_epi64 (vbase, vIncrement);
658+
i += simdWidth;
659+
}
660+
661+
// Handle remaining samples
662+
int64_t currentValue = baseValue + i;
663+
for (; i < numSamples; ++i)
664+
{
665+
output[i] = currentValue++;
666+
}
667+
668+
#else
669+
// Scalar fallback - simple loop that compilers can auto-vectorize
670+
for (int i = 0; i < numSamples; ++i)
671+
{
672+
output[i] = baseValue + i;
673+
}
674+
#endif
675+
}

Source/Processors/RecordNode/BinaryFormat/SIMDConverter.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,18 @@ class SIMDConverter
136136
*/
137137
static TileConfig getRecommendedTileConfig (int numChannels);
138138

139+
/**
140+
* Fills a buffer with sequential int64 values starting from a base value.
141+
* This is optimized for generating sample number arrays.
142+
*
143+
* output[i] = baseValue + i for i in [0, numSamples)
144+
*
145+
* @param output Pointer to output int64 buffer
146+
* @param baseValue Starting value (output[0] = baseValue)
147+
* @param numSamples Number of samples to fill
148+
*/
149+
static void fillSequentialInt64 (int64_t* output, int64_t baseValue, int numSamples);
150+
139151
private:
140152
// Implementation functions for each SIMD type
141153
static void convertScalar (const float* input, int16_t* output, float scale, int numSamples);

Source/Processors/RecordNode/BinaryFormat/SequentialBlockFile.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,10 @@ bool SequentialBlockFile::writeChannelBatch (uint64 startPos, int16* const* chan
138138
return false;
139139
}
140140

141+
// Batch writing requires all channels - return false to signal caller should use per-channel writes
141142
if (numChannels != m_nChannels)
142143
{
143-
printf ("[RN]SequentialBlockFile::writeChannelBatch: channel count mismatch (%d vs %d)\n",
144-
numChannels, m_nChannels);
144+
printf ("[RN]SequentialBlockFile::writeChannelBatch channel count mismatch: expected %d, got %d\n", m_nChannels, numChannels);
145145
return false;
146146
}
147147

Source/Processors/RecordNode/BinaryFormat/SequentialBlockFile.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class PLUGIN_API SequentialBlockFile
9292
void allocateBlocks (uint64 startIndex, int numSamples);
9393

9494
/** Compile-time params */
95-
const int streamBufferSize { 0 };
95+
const int streamBufferSize { 65536 }; // 64KB buffer to reduce system calls
9696
const int blockArrayInitSize { 128 };
9797
};
9898
#endif // !SEQUENTIALBLOCKFILE_H

Source/Processors/RecordNode/DataQueue.cpp

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,12 +242,48 @@ bool DataQueue::startRead (std::vector<CircularBufferIndexes>& dataBufferIdxs,
242242

243243
m_readInProgress = true;
244244

245+
// First pass: find the minimum samples available across ALL channels
246+
// This ensures we don't read from some channels while others are still being written
247+
int minSamplesAvailable = INT_MAX;
245248
for (int chan = 0; chan < m_numChans; ++chan)
246249
{
247250
int readyToRead = m_fifos.getUnchecked (chan)->getNumReady();
251+
if (readyToRead < minSamplesAvailable)
252+
minSamplesAvailable = readyToRead;
253+
}
254+
255+
// Apply nMax limit to the minimum
256+
int samplesToRead = ((minSamplesAvailable > nMax) && (nMax > 0)) ? nMax : minSamplesAvailable;
248257

249-
int samplesToRead = ((readyToRead > nMax) && (nMax > 0)) ? nMax : readyToRead;
258+
// If no samples available on any channel, nothing to read
259+
if (samplesToRead == 0)
260+
{
261+
// Initialize all indexes to zero
262+
for (int chan = 0; chan < m_numChans; ++chan)
263+
{
264+
CircularBufferIndexes& idx = dataBufferIdxs[chan];
265+
idx.index1 = 0;
266+
idx.size1 = 0;
267+
idx.index2 = 0;
268+
idx.size2 = 0;
269+
m_readSamples[chan] = 0;
270+
}
271+
for (int chan = 0; chan < m_numFTSChans; ++chan)
272+
{
273+
CircularBufferIndexes& idx = timestampBufferIdxs[chan];
274+
idx.index1 = 0;
275+
idx.size1 = 0;
276+
idx.index2 = 0;
277+
idx.size2 = 0;
278+
m_readFTSSamples[chan] = 0;
279+
}
280+
m_readInProgress = false;
281+
return false;
282+
}
250283

284+
// Second pass: read the same number of samples from all channels
285+
for (int chan = 0; chan < m_numChans; ++chan)
286+
{
251287
CircularBufferIndexes& idx = dataBufferIdxs[chan];
252288

253289
m_fifos.getUnchecked (chan)->prepareToRead (samplesToRead, idx.index1, idx.size1, idx.index2, idx.size2);
@@ -274,13 +310,21 @@ bool DataQueue::startRead (std::vector<CircularBufferIndexes>& dataBufferIdxs,
274310
m_lastReadSampleNumbers[chan] = sampleNum + idx.size1 + idx.size2;
275311
}
276312

313+
// Also find minimum for timestamp streams and read consistently
314+
int minFTSSamples = INT_MAX;
277315
for (int chan = 0; chan < m_numFTSChans; ++chan)
278316
{
279-
CircularBufferIndexes& idx = timestampBufferIdxs[chan];
280317
int readyToRead = m_FTSFifos.getUnchecked (chan)->getNumReady();
281-
int samplesToRead = ((readyToRead > nMax) && (nMax > 0)) ? nMax : readyToRead;
318+
if (readyToRead < minFTSSamples)
319+
minFTSSamples = readyToRead;
320+
}
321+
int ftsToRead = ((minFTSSamples > nMax) && (nMax > 0)) ? nMax : minFTSSamples;
322+
323+
for (int chan = 0; chan < m_numFTSChans; ++chan)
324+
{
325+
CircularBufferIndexes& idx = timestampBufferIdxs[chan];
282326

283-
m_FTSFifos.getUnchecked (chan)->prepareToRead (samplesToRead, idx.index1, idx.size1, idx.index2, idx.size2);
327+
m_FTSFifos.getUnchecked (chan)->prepareToRead (ftsToRead, idx.index1, idx.size1, idx.index2, idx.size2);
284328
m_readFTSSamples[chan] = idx.size1 + idx.size2;
285329
}
286330

0 commit comments

Comments
 (0)