Skip to content

Commit 2d45f62

Browse files
committed
Implement batch writing (10-15% recording speedup)
1 parent 733ef3c commit 2d45f62

9 files changed

Lines changed: 635 additions & 47 deletions

File tree

Source/Processors/RecordNode/BinaryFormat/BinaryRecording.cpp

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,109 @@ void BinaryRecording::writeSpike (int electrodeIndex, const Spike* spike)
786786
increaseEventCounts (rec);
787787
}
788788

789+
void BinaryRecording::writeContinuousDataBatch (const int* writeChannels,
790+
const int* realChannels,
791+
const float* const* dataBuffers,
792+
const double* timestampBuffer,
793+
int numChannels,
794+
int numSamples,
795+
int fileIndex)
796+
{
797+
if (numSamples == 0 || numChannels == 0)
798+
return;
799+
800+
// Ensure we have enough batch buffer space
801+
if (numSamples > m_batchBufferSamples || numChannels > m_batchBufferChannels)
802+
{
803+
int newSamples = jmax (numSamples, m_batchBufferSamples);
804+
int newChannels = jmax (numChannels, m_batchBufferChannels);
805+
806+
LOGD ("BinaryRecording::writeContinuousDataBatch: Resizing batch buffer to ",
807+
newChannels, " channels x ", newSamples, " samples");
808+
809+
m_batchIntBuffer.malloc (newSamples * newChannels);
810+
m_batchBufferSamples = newSamples;
811+
m_batchBufferChannels = newChannels;
812+
}
813+
814+
// Ensure sample number buffer is large enough
815+
if (numSamples > m_bufferSize)
816+
{
817+
m_sampleNumberBuffer.malloc (numSamples);
818+
m_bufferSize = numSamples;
819+
}
820+
821+
// Resize batch pointer arrays if needed
822+
if (m_batchScaleFactors.size() < static_cast<size_t> (numChannels))
823+
{
824+
m_batchScaleFactors.resize (numChannels);
825+
m_batchIntBufferPtrs.resize (numChannels);
826+
}
827+
828+
// Get file and validate
829+
if (fileIndex < 0 || fileIndex >= m_continuousFiles.size() || ! m_continuousFiles[fileIndex])
830+
return;
831+
832+
// Setup scale factors and output buffer pointers for each channel
833+
// Each channel gets a contiguous region of m_batchIntBuffer
834+
for (int i = 0; i < numChannels; i++)
835+
{
836+
m_batchScaleFactors[i] = 1.0f / getContinuousChannel (realChannels[i])->getBitVolts();
837+
m_batchIntBufferPtrs[i] = m_batchIntBuffer.getData() + i * numSamples;
838+
}
839+
840+
// Batch convert all channels using SIMD
841+
SIMDConverter::convertFloatToInt16Batch (
842+
dataBuffers,
843+
m_batchIntBufferPtrs.data(),
844+
m_batchScaleFactors.data(),
845+
numChannels,
846+
numSamples);
847+
848+
// Get starting sample position (all channels in a stream have same position)
849+
uint64 startPos = m_samplesWritten[writeChannels[0]];
850+
851+
// Write all channels at once using batch interleaving
852+
m_continuousFiles[fileIndex]->writeChannelBatch (
853+
startPos,
854+
m_batchIntBufferPtrs.data(),
855+
numChannels,
856+
numSamples);
857+
858+
// Update samples written for all channels
859+
for (int i = 0; i < numChannels; i++)
860+
{
861+
m_samplesWritten.set (writeChannels[i], m_samplesWritten[writeChannels[i]] + numSamples);
862+
}
863+
864+
// Write timestamps (only once per stream, using the first channel)
865+
// Find which channel in this batch is the first channel for this file (channel index 0 within stream)
866+
for (int i = 0; i < numChannels; i++)
867+
{
868+
if (m_channelIndexes[writeChannels[i]] == 0)
869+
{
870+
int64 baseSampleNumber = getLatestSampleNumber (writeChannels[i]);
871+
uint32 streamId = getContinuousChannel (realChannels[i])->getStreamId();
872+
873+
if (! wroteFirstSampleNumber[streamId])
874+
{
875+
firstSampleNumber[streamId] = baseSampleNumber;
876+
wroteFirstSampleNumber[streamId] = true;
877+
}
878+
879+
for (int s = 0; s < numSamples; s++)
880+
m_sampleNumberBuffer[s] = baseSampleNumber + s;
881+
882+
m_dataTimestampFiles[fileIndex]->writeData (m_sampleNumberBuffer, numSamples * sizeof (int64));
883+
m_dataTimestampFiles[fileIndex]->increaseRecordCount (numSamples);
884+
885+
m_dataSyncTimestampFiles[fileIndex]->writeData (timestampBuffer, numSamples * sizeof (double));
886+
m_dataSyncTimestampFiles[fileIndex]->increaseRecordCount (numSamples);
887+
break;
888+
}
889+
}
890+
}
891+
789892
void BinaryRecording::writeTimestampSyncText (uint64 streamId, int64 sampleNumber, float sourceSampleRate, String text)
790893
{
791894
if (! m_syncTextFile)

Source/Processors/RecordNode/BinaryFormat/BinaryRecording.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ class BinaryRecording : public RecordEngine
6262
const double* timestampBuffer,
6363
int size);
6464

65+
/**
66+
* Writes continuous data for all channels of a stream in a single batch operation.
67+
* Uses SIMD-optimized conversion and cache-efficient interleaving.
68+
*/
69+
void writeContinuousDataBatch (const int* writeChannels,
70+
const int* realChannels,
71+
const float* const* dataBuffers,
72+
const double* timestampBuffer,
73+
int numChannels,
74+
int numSamples,
75+
int fileIndex) override;
76+
6577
/** Writes an event to disk */
6678
void writeEvent (int eventIndex, const EventPacket& packet);
6779

@@ -102,6 +114,14 @@ class BinaryRecording : public RecordEngine
102114
int m_bufferSize;
103115
int m_syncTimestampBufferSize;
104116

117+
// Batch conversion buffers
118+
static const int MAX_BATCH_CHANNELS = 512; // Maximum channels per batch
119+
std::vector<float> m_batchScaleFactors;
120+
std::vector<int16*> m_batchIntBufferPtrs;
121+
HeapBlock<int16> m_batchIntBuffer; // Separate buffer for batch writes
122+
int m_batchBufferSamples { 0 }; // Samples per channel in batch buffer
123+
int m_batchBufferChannels { 0 }; // Number of channels in batch buffer
124+
105125
Array<unsigned int> m_channelIndexes;
106126
Array<unsigned int> m_fileIndexes;
107127

Source/Processors/RecordNode/BinaryFormat/SequentialBlockFile.cpp

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,108 @@ bool SequentialBlockFile::writeChannel (uint64 startPos, int channel, int16* dat
129129
return true;
130130
}
131131

132+
bool SequentialBlockFile::writeChannelBatch (uint64 startPos, int16* const* channelData, int numChannels, int nSamples)
133+
{
134+
if (! m_file)
135+
{
136+
printf ("[RN]SequentialBlockFile::writeChannelBatch returned false: (!m_file)\n");
137+
return false;
138+
}
139+
140+
if (numChannels != m_nChannels)
141+
{
142+
printf ("[RN]SequentialBlockFile::writeChannelBatch: channel count mismatch (%d vs %d)\n",
143+
numChannels, m_nChannels);
144+
return false;
145+
}
146+
147+
int bIndex = m_memBlocks.size() - 1;
148+
if ((bIndex < 0) || (m_memBlocks[bIndex]->getOffset() + m_samplesPerBlock) < (startPos + nSamples))
149+
allocateBlocks (startPos, nSamples);
150+
151+
for (bIndex = m_memBlocks.size() - 1; bIndex >= 0; bIndex--)
152+
{
153+
if (m_memBlocks[bIndex]->getOffset() <= startPos)
154+
break;
155+
}
156+
if (bIndex < 0)
157+
{
158+
return false;
159+
}
160+
161+
int writtenSamples = 0;
162+
uint64 startIdx = startPos - m_memBlocks[bIndex]->getOffset();
163+
int dataIdx = 0;
164+
int lastBlockIdx = m_memBlocks.size() - 1;
165+
166+
// Process in blocks for better cache utilization
167+
// Block size chosen to fit in L1 cache (typically 32-64KB)
168+
const int cacheBlockSamples = 64; // Process 64 samples at a time
169+
170+
// Cache blocking parameters - chosen to fit in L1 cache (~32KB)
171+
// For a tile of TILE_SAMPLES x TILE_CHANNELS:
172+
// - Input: TILE_CHANNELS pointers (8 bytes each) + TILE_SAMPLES * TILE_CHANNELS * 2 bytes data
173+
// - Output: TILE_SAMPLES * nChannels * 2 bytes (but we only write TILE_CHANNELS at a time)
174+
// With 256 samples x 64 channels: 256 * 64 * 2 = 32KB input data per tile
175+
const int TILE_SAMPLES = 256;
176+
const int TILE_CHANNELS = 64;
177+
178+
while (writtenSamples < nSamples)
179+
{
180+
int16* blockPtr = m_memBlocks[bIndex]->getData();
181+
int samplesToWrite = jmin ((nSamples - writtenSamples), (m_samplesPerBlock - int (startIdx)));
182+
183+
uint64 baseMemPos = startIdx * m_nChannels;
184+
185+
// Process in tiles to optimize cache usage
186+
// For each tile of samples:
187+
for (int sampleTileStart = 0; sampleTileStart < samplesToWrite; sampleTileStart += TILE_SAMPLES)
188+
{
189+
int sampleTileEnd = jmin (sampleTileStart + TILE_SAMPLES, samplesToWrite);
190+
191+
// For each tile of channels:
192+
for (int channelTileStart = 0; channelTileStart < m_nChannels; channelTileStart += TILE_CHANNELS)
193+
{
194+
int channelTileEnd = jmin (channelTileStart + TILE_CHANNELS, m_nChannels);
195+
196+
// Process this tile - iterate samples in outer loop to optimize output writes
197+
for (int s = sampleTileStart; s < sampleTileEnd; s++)
198+
{
199+
uint64 memPos = baseMemPos + s * m_nChannels + channelTileStart;
200+
int srcIdx = dataIdx + s;
201+
202+
// Write channels in this tile for this sample
203+
for (int ch = channelTileStart; ch < channelTileEnd; ch++)
204+
{
205+
blockPtr[memPos + (ch - channelTileStart)] = channelData[ch][srcIdx];
206+
}
207+
}
208+
}
209+
}
210+
211+
writtenSamples += samplesToWrite;
212+
dataIdx += samplesToWrite;
213+
214+
// Update the last block fill index
215+
size_t samplePos = startIdx + samplesToWrite;
216+
if (bIndex == lastBlockIdx && samplePos > m_lastBlockFill)
217+
{
218+
m_lastBlockFill = samplePos;
219+
}
220+
221+
startIdx = 0;
222+
bIndex++;
223+
}
224+
225+
// Update current block for all channels
226+
for (int ch = 0; ch < m_nChannels; ch++)
227+
{
228+
m_currentBlock.set (ch, bIndex - 1);
229+
}
230+
231+
return true;
232+
}
233+
132234
void SequentialBlockFile::allocateBlocks (uint64 startIndex, int numSamples)
133235
{
134236
//First deallocate full blocks

Source/Processors/RecordNode/BinaryFormat/SequentialBlockFile.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,19 @@ class PLUGIN_API SequentialBlockFile
6666
/** Writes nSamples of data for a particular channel */
6767
bool writeChannel (uint64 startPos, int channel, int16* data, int nSamples);
6868

69+
/**
70+
* Writes data for all channels at once with optimized interleaving.
71+
* This is more efficient than calling writeChannel() for each channel separately
72+
* because it performs interleaving in a cache-friendly manner.
73+
*
74+
* @param startPos Starting sample position in the file
75+
* @param channelData Array of pointers to int16 data for each channel
76+
* @param numChannels Number of channels (must match m_nChannels)
77+
* @param nSamples Number of samples per channel
78+
* @return true if write was successful
79+
*/
80+
bool writeChannelBatch (uint64 startPos, int16* const* channelData, int numChannels, int nSamples);
81+
6982
private:
7083
std::shared_ptr<FileOutputStream> m_file;
7184
const int m_nChannels;

Source/Processors/RecordNode/RecordEngine.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,22 @@ int RecordEngine::getNumRecordedSpikeChannels() const
131131
return recordNode->getTotalSpikeChannels();
132132
}
133133

134+
void RecordEngine::writeContinuousDataBatch (const int* writeChannels,
135+
const int* realChannels,
136+
const float* const* dataBuffers,
137+
const double* timestampBuffer,
138+
int numChannels,
139+
int numSamples,
140+
int fileIndex)
141+
{
142+
// Default implementation: fall back to per-channel writes
143+
// Subclasses can override for optimized batch processing
144+
for (int i = 0; i < numChannels; i++)
145+
{
146+
writeContinuousData (writeChannels[i], realChannels[i], dataBuffers[i], timestampBuffer, numSamples);
147+
}
148+
}
149+
134150
void RecordEngine::configureEngine()
135151
{
136152
if (! manager)

Source/Processors/RecordNode/RecordEngine.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,26 @@ class PLUGIN_API RecordEngine
9898
const double* timestampBuffer,
9999
int size) = 0;
100100

101+
/**
102+
* Write continuous data for all channels of a stream in a single batch operation.
103+
* This is more efficient than calling writeContinuousData() for each channel.
104+
*
105+
* @param writeChannels Array of write channel indices (indices among all recorded channels)
106+
* @param realChannels Array of real channel indices (indices within processor)
107+
* @param dataBuffers Array of pointers to float data buffers (one per channel)
108+
* @param timestampBuffer Pointer to synchronized timestamp buffer (shared across channels)
109+
* @param numChannels Number of channels in this batch
110+
* @param numSamples Number of samples per channel
111+
* @param fileIndex Index of the file/stream to write to
112+
*/
113+
virtual void writeContinuousDataBatch (const int* writeChannels,
114+
const int* realChannels,
115+
const float* const* dataBuffers,
116+
const double* timestampBuffer,
117+
int numChannels,
118+
int numSamples,
119+
int fileIndex);
120+
101121
/** Write a single event to disk (TTL or TEXT) */
102122
virtual void writeEvent (int eventChannel, const EventPacket& event) = 0;
103123

0 commit comments

Comments
 (0)