diff --git a/Framework/Core/include/Framework/DataProcessingContext.h b/Framework/Core/include/Framework/DataProcessingContext.h index 221f7b099dc07..f66f7d7c7f8d2 100644 --- a/Framework/Core/include/Framework/DataProcessingContext.h +++ b/Framework/Core/include/Framework/DataProcessingContext.h @@ -13,6 +13,7 @@ #include "Framework/DataRelayer.h" #include "Framework/AlgorithmSpec.h" +#include #include namespace o2::framework @@ -33,7 +34,7 @@ struct DataProcessorContext { DataProcessorContext(DataProcessorContext const&) = delete; DataProcessorContext() = default; - bool allDone = false; + std::atomic allDone = false; /// Latest run number we processed globally for this DataProcessor. int64_t lastRunNumberProcessed = -1; @@ -43,7 +44,6 @@ struct DataProcessorContext { // FIXME: move stuff here from the list below... ;-) ServiceRegistry* registry = nullptr; - std::vector completed; std::vector expirationHandlers; AlgorithmSpec::InitCallback init; AlgorithmSpec::ProcessCallback statefulProcess; diff --git a/Framework/Core/include/Framework/StreamContext.h b/Framework/Core/include/Framework/StreamContext.h index 79c8ad798836a..fb29c3e164aef 100644 --- a/Framework/Core/include/Framework/StreamContext.h +++ b/Framework/Core/include/Framework/StreamContext.h @@ -11,6 +11,7 @@ #ifndef O2_FRAMEWORK_STREAMCONTEXT_H_ #define O2_FRAMEWORK_STREAMCONTEXT_H_ +#include "Framework/DataRelayer.h" #include "Framework/ServiceHandle.h" #include "ProcessingContext.h" #include "ServiceSpec.h" @@ -64,6 +65,10 @@ struct StreamContext { // the callback will be called for all of them. std::vector preStartStreamHandles; + /// Per-stream list of actions ready to be dispatched. Populated by + /// getReadyToProcess() and consumed by tryDispatchComputation(). + std::vector completed; + // Information on wether or not all the required routes have been created. // This is used to check if the LifetimeTimeframe routes were all created // for a given iteration. diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index be25133158072..ee68d0728d52e 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -187,9 +187,13 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi // 99 is to execute DPL callbacks last this->SubscribeToStateChange("99-dpl", stateWatcher); - // One task for now. - mStreams.resize(1); - mHandles.resize(1); + auto* poolSizeEnv = getenv("DPL_THREADPOOL_SIZE"); + // 0 (or unset): synchronous execution on the main thread. + // N > 0: N concurrent async streams; I/O runs on the main thread while + // computation runs on N pool threads. + size_t numStreams = poolSizeEnv ? std::max(0, std::atoi(poolSizeEnv)) : 0; + mStreams.resize(std::max(numStreams, 1UL)); + mHandles.resize(std::max(numStreams, 1UL)); ServiceRegistryRef ref{mServiceRegistry}; @@ -1210,10 +1214,8 @@ void DataProcessingDevice::Run() O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop); O2_SIGNPOST_START(device, lid, "device_state", "First iteration of the device loop"); - bool dplEnableMultithreding = getenv("DPL_THREADPOOL_SIZE") != nullptr; - if (dplEnableMultithreding) { - setenv("UV_THREADPOOL_SIZE", "1", 1); - } + auto* poolSizeEnv = getenv("DPL_THREADPOOL_SIZE"); + bool dplEnableMultithreding = poolSizeEnv && std::atoi(poolSizeEnv) > 0; while (state.transitionHandling != TransitionHandlingState::Expired) { if (state.nextFairMQState.empty() == false) { @@ -1634,6 +1636,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) void DataProcessingDevice::doRun(ServiceRegistryRef ref) { auto& context = ref.get(); + auto& streamContext = ref.get(); O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); auto& state = ref.get(); auto& spec = ref.get(); @@ -1642,9 +1645,9 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) return; } - context.completed.clear(); - context.completed.reserve(16); - if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) { + streamContext.completed.clear(); + streamContext.completed.reserve(16); + if (DataProcessingDevice::tryDispatchComputation(ref, streamContext.completed)) { state.lastActiveDataProcessor.store(&context); } DanglingContext danglingContext{*context.registry}; @@ -1658,8 +1661,8 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) state.lastActiveDataProcessor = &context; } - context.completed.clear(); - if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) { + streamContext.completed.clear(); + if (DataProcessingDevice::tryDispatchComputation(ref, streamContext.completed)) { state.lastActiveDataProcessor = &context; } @@ -1685,7 +1688,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) bool shouldProcess = DataProcessingHelpers::hasOnlyGenerated(spec) == false; - while (DataProcessingDevice::tryDispatchComputation(ref, context.completed) && shouldProcess) { + while (DataProcessingDevice::tryDispatchComputation(ref, streamContext.completed) && shouldProcess) { relayer.processDanglingInputs(context.expirationHandlers, *context.registry, false); }