Skip to content

Commit 4a9a46a

Browse files
committed
Fix BinaryRecording file indexing when streams are disabled
1 parent ce291ca commit 4a9a46a

3 files changed

Lines changed: 106 additions & 12 deletions

File tree

Source/Processors/RecordNode/BinaryFormat/BinaryRecording.cpp

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,9 @@ void BinaryRecording::openFiles (File rootFolder, int experimentNumber, int reco
110110
singleStreamJSON.clear();
111111
}
112112

113-
LOGD ("Recording channel: ", channelInfo->getName(), " from stream ", streamId, " with global index ", globalIndex, " and local index ", indexWithinStream);
114-
115113
m_fileIndexes.set (ch, streamIndex);
116114
m_channelIndexes.set (ch, indexWithinStream++);
117115

118-
119116
DynamicObject::Ptr singleChannelJSON = new DynamicObject();
120117

121118
singleChannelJSON->setProperty ("channel_name", channelInfo->getName());
@@ -787,6 +784,7 @@ void BinaryRecording::writeContinuousDataBatch (const int* writeChannels,
787784
int numSamples,
788785
int fileIndex)
789786
{
787+
ignoreUnused (fileIndex);
790788
if (numSamples == 0 || numChannels == 0)
791789
return;
792790

@@ -821,8 +819,12 @@ void BinaryRecording::writeContinuousDataBatch (const int* writeChannels,
821819
m_batchIntBufferPtrs.resize (numChannels);
822820
}
823821

822+
// Resolve the file index from recorded channel mapping.
823+
// This is robust even when some source streams have zero recorded channels.
824+
const int resolvedFileIndex = m_fileIndexes[writeChannels[0]];
825+
824826
// Get file and validate
825-
if (fileIndex < 0 || fileIndex >= m_continuousFiles.size() || ! m_continuousFiles[fileIndex])
827+
if (resolvedFileIndex < 0 || resolvedFileIndex >= m_continuousFiles.size() || ! m_continuousFiles[resolvedFileIndex])
826828
return;
827829

828830
// Setup scale factors and output buffer pointers for each channel
@@ -844,12 +846,12 @@ void BinaryRecording::writeContinuousDataBatch (const int* writeChannels,
844846
// Get starting sample position (all channels in a stream have same position)
845847
uint64 startPos = m_samplesWritten[writeChannels[0]];
846848

847-
//LOGD("BinaryRecording::writeContinuousDataBatch: Writing ", numSamples, " samples for ", numChannels, " channels at position ", startPos, " to file index ", fileIndex);
849+
//LOGD("BinaryRecording::writeContinuousDataBatch: Writing ", numSamples, " samples for ", numChannels, " channels at position ", startPos, " to file index ", resolvedFileIndex);
848850

849851
// Try batch interleaving if we have all channels for this file
850852
// The file's channel count is determined by the stream's channel count
851853
// If we have a partial batch, fall back to per-channel writes
852-
bool useBatchWrite = m_continuousFiles[fileIndex] != nullptr && m_continuousFiles[fileIndex]->writeChannelBatch (startPos, m_batchIntBufferPtrs.data(), numChannels, numSamples);
854+
bool useBatchWrite = m_continuousFiles[resolvedFileIndex] != nullptr && m_continuousFiles[resolvedFileIndex]->writeChannelBatch (startPos, m_batchIntBufferPtrs.data(), numChannels, numSamples);
853855

854856
if (! useBatchWrite)
855857
{
@@ -858,7 +860,7 @@ void BinaryRecording::writeContinuousDataBatch (const int* writeChannels,
858860
{
859861
int writeChannel = writeChannels[i];
860862
int channelIdx = m_channelIndexes[writeChannel];
861-
m_continuousFiles[fileIndex]->writeChannel (
863+
m_continuousFiles[resolvedFileIndex]->writeChannel (
862864
startPos,
863865
channelIdx,
864866
m_batchIntBufferPtrs[i],
@@ -884,11 +886,11 @@ void BinaryRecording::writeContinuousDataBatch (const int* writeChannels,
884886
/* Generate sequential sample numbers using SIMD-optimized fill */
885887
SIMDConverter::fillSequentialInt64 (reinterpret_cast<int64_t*> (m_sampleNumberBuffer.getData()), baseSampleNumber, numSamples);
886888

887-
m_dataTimestampFiles[fileIndex]->writeData (m_sampleNumberBuffer, numSamples * sizeof (int64));
888-
m_dataTimestampFiles[fileIndex]->increaseRecordCount (numSamples);
889+
m_dataTimestampFiles[resolvedFileIndex]->writeData (m_sampleNumberBuffer, numSamples * sizeof (int64));
890+
m_dataTimestampFiles[resolvedFileIndex]->increaseRecordCount (numSamples);
889891

890-
m_dataSyncTimestampFiles[fileIndex]->writeData (timestampBuffer, numSamples * sizeof (double));
891-
m_dataSyncTimestampFiles[fileIndex]->increaseRecordCount (numSamples);
892+
m_dataSyncTimestampFiles[resolvedFileIndex]->writeData (timestampBuffer, numSamples * sizeof (double));
893+
m_dataSyncTimestampFiles[resolvedFileIndex]->increaseRecordCount (numSamples);
892894
break;
893895
}
894896
}

Source/Processors/RecordNode/BinaryFormat/SIMDConverter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
#include <cstdint>
3333
#include <string>
34+
#include "../../../TestableExport.h"
3435

3536
/**
3637
* SIMD-optimized float-to-int16 conversion utilities.
@@ -47,7 +48,7 @@
4748
* (FloatVectorOperations::copyWithMultiply + AudioDataConverters::convertFloatToInt16LE)
4849
* into a single pass, eliminating the intermediate buffer and improving cache utilization.
4950
*/
50-
class SIMDConverter
51+
class TESTABLE SIMDConverter
5152
{
5253
public:
5354
/**

Tests/Processors/RecordNodeTests.cpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,97 @@ TEST_F(RecordNodeTests, WriteContinuousDataBatch_EmptyBatch_ZeroSamples_NoDataWr
820820
ASSERT_EQ(persistedData.size(), 0u);
821821
}
822822

823+
class MultiStream_RecordNodeTests : public testing::Test {
824+
protected:
825+
void SetUp() override {
826+
tester = std::make_unique<ProcessorTester>(TestSourceNodeBuilder
827+
(FakeSourceNodeParams{
828+
4, // channels per stream
829+
30000, // sample rate
830+
1.0f, // bitVolts
831+
3 // streams
832+
}));
833+
834+
parentRecordingDir = std::filesystem::temp_directory_path() / "record_node_multi_stream_tests";
835+
if (std::filesystem::exists(parentRecordingDir)) {
836+
std::filesystem::remove_all(parentRecordingDir);
837+
}
838+
std::filesystem::create_directory(parentRecordingDir);
839+
840+
tester->setRecordingParentDirectory(parentRecordingDir.string());
841+
processor = tester->createProcessor<RecordNode>(Plugin::Processor::RECORD_NODE);
842+
}
843+
844+
void TearDown() override {
845+
std::error_code ec;
846+
std::filesystem::remove_all(parentRecordingDir, ec);
847+
}
848+
849+
bool getRecordingPath(std::filesystem::path* outPath) const {
850+
auto dirIter = std::filesystem::directory_iterator(parentRecordingDir);
851+
if (dirIter == std::filesystem::directory_iterator()) {
852+
return false;
853+
}
854+
855+
auto recordingDir = dirIter->path();
856+
std::stringstream ss;
857+
ss << "Record Node " << processor->getNodeId();
858+
*outPath = recordingDir / ss.str() / "experiment1" / "recording1";
859+
return true;
860+
}
861+
862+
std::unique_ptr<ProcessorTester> tester;
863+
RecordNode* processor = nullptr;
864+
std::filesystem::path parentRecordingDir;
865+
};
866+
867+
TEST_F(MultiStream_RecordNodeTests, DisabledMiddleStream_DoesNotBlockSubsequentStreamWrites) {
868+
// Disable all channels on middle stream; keep streams 0 and 2 enabled.
869+
auto streams = processor->getDataStreams();
870+
ASSERT_EQ(streams.size(), 3);
871+
872+
Array<var> noChannelsSelected;
873+
auto* middleStreamMask = static_cast<MaskChannelsParameter*>(streams[1]->getParameter("channels"));
874+
ASSERT_NE(middleStreamMask, nullptr);
875+
middleStreamMask->setNextValue(noChannelsSelected, false);
876+
processor->updateSettings();
877+
878+
tester->startAcquisition(true);
879+
880+
AudioBuffer<float> inputBuffer(12, 256); // 3 streams * 4 channels/stream
881+
for (int ch = 0; ch < inputBuffer.getNumChannels(); ch++) {
882+
for (int s = 0; s < inputBuffer.getNumSamples(); s++) {
883+
inputBuffer.setSample(ch, s, static_cast<float>(ch * 1000 + s));
884+
}
885+
}
886+
887+
for (int i = 0; i < 5; i++) {
888+
tester->processBlock(processor, inputBuffer);
889+
}
890+
891+
tester->stopAcquisition();
892+
893+
std::filesystem::path recordingPath;
894+
ASSERT_TRUE(getRecordingPath(&recordingPath));
895+
auto continuousPath = recordingPath / "continuous";
896+
ASSERT_TRUE(std::filesystem::exists(continuousPath));
897+
898+
std::vector<std::filesystem::path> continuousDatFiles;
899+
for (const auto& entry : std::filesystem::recursive_directory_iterator(continuousPath)) {
900+
if (entry.is_regular_file() && entry.path().filename() == "continuous.dat") {
901+
continuousDatFiles.push_back(entry.path());
902+
}
903+
}
904+
905+
// Only streams with recorded channels should have files: stream 0 and stream 2.
906+
ASSERT_EQ(continuousDatFiles.size(), 2u);
907+
908+
for (const auto& datFile : continuousDatFiles) {
909+
ASSERT_GT(std::filesystem::file_size(datFile), 0u)
910+
<< "Expected non-empty data file for recorded stream: " << datFile.string();
911+
}
912+
}
913+
823914
/**
824915
* Test that writeContinuousDataBatch correctly resizes internal buffers.
825916
* This tests the buffer reallocation logic by writing progressively larger blocks.

0 commit comments

Comments
 (0)