Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ struct Data {
return positive ? magnitude : -magnitude;
}

// Encode from float: clamps magnitude to 15 bits, range ±255.992
// Encode from float: truncates magnitude to 15 bits, range ±255.992
void setCMVFloat(float value)
{
const bool positive = (value >= 0.f);
const uint16_t magnitude = static_cast<uint16_t>(std::abs(value) * 128.f + 0.5f) & 0x7FFF;
const uint16_t magnitude = static_cast<uint16_t>(
std::lround(std::abs(value) * 128.f)) &
0x7FFF;
cmv = (positive ? 0x8000 : 0x0000) | magnitude;
}
};
Expand Down Expand Up @@ -119,4 +121,4 @@ struct Container {

} // namespace o2::tpc::cmv

#endif
#endif
5 changes: 5 additions & 0 deletions Detectors/TPC/workflow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,9 @@ o2_add_executable(cmv-aggregate
SOURCES src/tpc-aggregate-cmv.cxx
PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)

o2_add_executable(cmv-trigger
COMPONENT_NAME tpc
SOURCES src/tpc-cmv-trigger.cxx
PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)

add_subdirectory(readers)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace o2::tpc

/// create a processor spec
/// convert CMV raw values to a vector in a CRU
o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector<uint32_t> const& crus);
o2::framework::DataProcessorSpec getCMVToVectorSpec(std::string const& inputSpec, std::vector<uint32_t> const& crus);

} // end namespace o2::tpc

Expand Down
103 changes: 50 additions & 53 deletions Detectors/TPC/workflow/include/TPCWorkflow/TPCAggregateCMVSpec.h

Large diffs are not rendered by default.

76 changes: 36 additions & 40 deletions Detectors/TPC/workflow/include/TPCWorkflow/TPCDistributeCMVSpec.h

Large diffs are not rendered by default.

121 changes: 92 additions & 29 deletions Detectors/TPC/workflow/include/TPCWorkflow/TPCFLPCMVSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
/// @author Tuba Gündem, tuba.gundem@cern.ch
/// @brief TPC device for processing CMVs on FLPs

#ifndef O2_TPCFLPIDCSPEC_H
#define O2_TPCFLPIDCSPEC_H
#ifndef O2_TPCFLPCMVSPEC_H
#define O2_TPCFLPCMVSPEC_H

#include <vector>
#include <unordered_map>
Expand All @@ -28,12 +28,9 @@
#include "Headers/DataHeader.h"
#include "TPCWorkflow/ProcessingHelpers.h"
#include "TPCBase/CRU.h"
#include "DataFormatsTPC/CMV.h"
#include "TFile.h"

using namespace o2::framework;
using o2::header::gDataOriginTPC;
using namespace o2::tpc;

namespace o2::tpc
{

Expand All @@ -46,6 +43,12 @@ class TPCFLPCMVDevice : public o2::framework::Task
void init(o2::framework::InitContext& ic) final
{
mDumpCMVs = ic.options().get<bool>("dump-cmvs-flp");
mEnableTrigger = ic.options().get<bool>("trigger");
mTriggerThresholdCMV = ic.options().get<float>("trigger-threshold-cmv");
mTriggerThresholdMeanMax = ic.options().get<float>("trigger-threshold-cmvMeanMax");
mTriggerThresholdMeanMin = ic.options().get<float>("trigger-threshold-cmvMeanMin");
mTriggerTimebinMin = ic.options().get<int>("trigger-threshold-timebinMin");
mTriggerTimebinMax = ic.options().get<int>("trigger-threshold-timebinMax");
}

void run(o2::framework::ProcessingContext& pc) final
Expand All @@ -56,7 +59,7 @@ class TPCFLPCMVDevice : public o2::framework::Task

// Capture heartbeatOrbit / heartbeatBC from the first TF in the buffer
if (mCountTFsForBuffer == 1) {
for (auto& ref : InputRecordWalker(pc.inputs(), mOrbitFilter)) {
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mOrbitFilter)) {
auto const* hdr = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
const uint32_t cru = hdr->subSpecification >> 7;
if (mFirstOrbitBC.find(cru) == mFirstOrbitBC.end()) {
Expand All @@ -68,12 +71,18 @@ class TPCFLPCMVDevice : public o2::framework::Task
}
}

for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
bool triggered = false;
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
const int cru = tpcCRUHeader->subSpecification >> 7;
const uint32_t cru = tpcCRUHeader->subSpecification >> 7;
auto vecCMVs = pc.inputs().get<o2::pmr::vector<uint16_t>>(ref);
mCMVs[cru].insert(mCMVs[cru].end(), vecCMVs.begin(), vecCMVs.end());

const bool cruTriggered = mEnableTrigger && evaluateTrigger(vecCMVs);
triggered |= cruTriggered;
}
const header::DataHeader::SubSpecificationType trigSubSpec{mCRUs.front() << 7};
pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVTrigger(), trigSubSpec}, triggered);

if (mCountTFsForBuffer >= mNTFsBuffer) {
mCountTFsForBuffer = 0;
Expand All @@ -86,7 +95,7 @@ class TPCFLPCMVDevice : public o2::framework::Task

if (mDumpCMVs) {
TFile fOut(fmt::format("CMVs_{}_tf_{}.root", mLane, processing_helpers::getCurrentTF(pc)).data(), "RECREATE");
for (auto& ref : InputRecordWalker(pc.inputs(), mFilter)) {
for (auto& ref : o2::framework::InputRecordWalker(pc.inputs(), mFilter)) {
auto const* tpcCRUHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
const int cru = tpcCRUHeader->subSpecification >> 7;
auto vec = pc.inputs().get<std::vector<uint16_t>>(ref);
Expand All @@ -103,14 +112,17 @@ class TPCFLPCMVDevice : public o2::framework::Task
sendOutput(ec.outputs(), cru);
}
}
ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
ec.services().get<o2::framework::ControlService>().readyToQuit(o2::framework::QuitRequest::Me);
}

static constexpr header::DataDescription getDataDescriptionCMVGroup() { return header::DataDescription{"CMVGROUP"}; }

/// Data description for the packed (orbit<<32|bc) scalar forwarded alongside each CRU's CMVGROUP.
static constexpr header::DataDescription getDataDescriptionCMVOrbitInfo() { return header::DataDescription{"CMVORBITINFO"}; }

/// Data description for the per-CRU per-TF trigger flag (empty span = not triggered or disabled; {1} = triggered).
static constexpr header::DataDescription getDataDescriptionCMVTrigger() { return header::DataDescription{"CMVTRIGGER"}; }

private:
const int mLane{}; ///< lane number of processor
const std::vector<uint32_t> mCRUs{}; ///< CRUs to process in this instance
Expand All @@ -119,13 +131,53 @@ class TPCFLPCMVDevice : public o2::framework::Task
int mCountTFsForBuffer{0}; ///< counts TFs to track when to send output
std::unordered_map<unsigned int, o2::pmr::vector<uint16_t>> mCMVs{}; ///< buffered raw 16-bit CMV values per CRU
std::unordered_map<uint32_t, uint64_t> mFirstOrbitBC{}; ///< first packed orbit/BC per CRU for the current buffer window
bool mEnableTrigger{false}; ///< enable CMV trigger evaluation
float mTriggerThresholdCMV{-10.f}; ///< CMV value threshold: trigger sequence starts when value drops below this
float mTriggerThresholdMeanMax{-40.f}; ///< upper bound on trigger-sequence mean CMV value
float mTriggerThresholdMeanMin{-80.f}; ///< lower bound on trigger-sequence mean CMV value
int mTriggerTimebinMin{4}; ///< minimum trigger-sequence length (timebins) to accept
int mTriggerTimebinMax{-1}; ///< maximum trigger-sequence length (timebins) to accept; -1 disables

/// Filter for CMV float vectors (one CMVVECTOR message per CRU per TF)
const std::vector<InputSpec> mFilter = {{"cmvs", ConcreteDataTypeMatcher{gDataOriginTPC, "CMVVECTOR"}, Lifetime::Timeframe}};
const std::vector<o2::framework::InputSpec> mFilter = {{"cmvs", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "CMVVECTOR"}, o2::framework::Lifetime::Timeframe}};
/// Filter for CMV packet timing info (one CMVORBITS message per CRU per TF, sent by CMVToVectorSpec)
const std::vector<InputSpec> mOrbitFilter = {{"cmvorbits", ConcreteDataTypeMatcher{gDataOriginTPC, "CMVORBITS"}, Lifetime::Timeframe}};
const std::vector<o2::framework::InputSpec> mOrbitFilter = {{"cmvorbits", o2::framework::ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "CMVORBITS"}, o2::framework::Lifetime::Timeframe}};

// Scan a CRU's CMV vector for contiguous below-threshold sequences.
// Returns true as soon as one sequence satisfies both the length and mean criteria.
bool evaluateTrigger(const o2::pmr::vector<uint16_t>& cmvs) const
{
float seqSum = 0.f;
int seqLen = 0;

auto checkSequence = [&]() -> bool {
if (seqLen == 0) {
return false;
}
const float mean = seqSum / seqLen;
return (seqLen >= mTriggerTimebinMin) &&
(mTriggerTimebinMax < 0 || seqLen <= mTriggerTimebinMax) &&
(mean >= mTriggerThresholdMeanMin) &&
(mean <= mTriggerThresholdMeanMax);
};

for (const auto raw : cmvs) {
const float val = cmv::Data{raw}.getCMVFloat();
if (val < mTriggerThresholdCMV) {
seqSum += val;
++seqLen;
} else {
if (checkSequence()) {
return true;
}
seqLen = 0;
seqSum = 0.f;
}
}
return checkSequence(); // trailing sequence that reached end of buffer
}

void sendOutput(DataAllocator& output, const uint32_t cru)
void sendOutput(o2::framework::DataAllocator& output, const uint32_t cru)
{
const header::DataHeader::SubSpecificationType subSpec{cru << 7};

Expand All @@ -134,39 +186,50 @@ class TPCFLPCMVDevice : public o2::framework::Task
if (auto it = mFirstOrbitBC.find(cru); it != mFirstOrbitBC.end()) {
orbitBC = it->second;
}
output.snapshot(Output{gDataOriginTPC, getDataDescriptionCMVOrbitInfo(), subSpec}, orbitBC);
output.snapshot(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVOrbitInfo(), subSpec}, orbitBC);

output.adoptContainer(Output{gDataOriginTPC, getDataDescriptionCMVGroup(), subSpec}, std::move(mCMVs[cru]));
output.adoptContainer(o2::framework::Output{o2::header::gDataOriginTPC, getDataDescriptionCMVGroup(), subSpec}, std::move(mCMVs[cru]));
}
};

DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector<uint32_t>& crus, const int nTFsBuffer = 1)
o2::framework::DataProcessorSpec getTPCFLPCMVSpec(const int ilane, const std::vector<uint32_t>& crus, const int nTFsBuffer = 1)
{
std::vector<OutputSpec> outputSpecs;
std::vector<InputSpec> inputSpecs;
outputSpecs.reserve(crus.size());
inputSpecs.reserve(crus.size());
std::vector<o2::framework::OutputSpec> outputSpecs;
std::vector<o2::framework::InputSpec> inputSpecs;
outputSpecs.reserve(crus.size() * 2 + 1);
inputSpecs.reserve(crus.size() * 2);

for (const auto& cru : crus) {
const header::DataHeader::SubSpecificationType subSpec{cru << 7};

// Inputs from CMVToVectorSpec
inputSpecs.emplace_back(InputSpec{"cmvs", gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe});
inputSpecs.emplace_back(InputSpec{"cmvorbits", gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe});
inputSpecs.emplace_back(o2::framework::InputSpec{"cmvs", o2::header::gDataOriginTPC, "CMVVECTOR", subSpec, o2::framework::Lifetime::Timeframe});
inputSpecs.emplace_back(o2::framework::InputSpec{"cmvorbits", o2::header::gDataOriginTPC, "CMVORBITS", subSpec, o2::framework::Lifetime::Timeframe});

// Outputs to TPCDistributeCMVSpec
outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(), subSpec}, Lifetime::Sporadic);
outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo(), subSpec}, Lifetime::Sporadic);
outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVGroup(), subSpec}, o2::framework::Lifetime::Sporadic);
outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVOrbitInfo(), subSpec}, o2::framework::Lifetime::Sporadic);
}

// Single per-FLP trigger output, subspec keyed on the first CRU
const header::DataHeader::SubSpecificationType trigSubSpec{crus.front() << 7};
outputSpecs.emplace_back(o2::framework::ConcreteDataMatcher{o2::header::gDataOriginTPC, TPCFLPCMVDevice::getDataDescriptionCMVTrigger(), trigSubSpec}, o2::framework::Lifetime::Timeframe);

const auto id = fmt::format("tpc-flp-cmv-{:02}", ilane);
return DataProcessorSpec{
return o2::framework::DataProcessorSpec{
id.data(),
inputSpecs,
outputSpecs,
AlgorithmSpec{adaptFromTask<TPCFLPCMVDevice>(ilane, crus, nTFsBuffer)},
Options{{"dump-cmvs-flp", VariantType::Bool, false, {"Dump CMVs to file"}}}};
o2::framework::AlgorithmSpec{o2::framework::adaptFromTask<TPCFLPCMVDevice>(ilane, crus, nTFsBuffer)},
o2::framework::Options{
{"dump-cmvs-flp", o2::framework::VariantType::Bool, false, {"Dump CMVs to file"}},
{"trigger", o2::framework::VariantType::Bool, false, {"Enable CMV trigger evaluation"}},
{"trigger-threshold-cmv", o2::framework::VariantType::Float, -10.f, {"CMV threshold: sequence starts when value drops below this (ADC units)"}},
{"trigger-threshold-cmvMeanMax", o2::framework::VariantType::Float, -40.f, {"Upper bound on trigger-sequence mean CMV value"}},
{"trigger-threshold-cmvMeanMin", o2::framework::VariantType::Float, -80.f, {"Lower bound on trigger-sequence mean CMV value"}},
{"trigger-threshold-timebinMin", o2::framework::VariantType::Int, 4, {"Minimum trigger-sequence length in timebins"}},
{"trigger-threshold-timebinMax", o2::framework::VariantType::Int, -1, {"Maximum trigger-sequence length in timebins (-1 disables upper bound)"}}}};
}

} // namespace o2::tpc
#endif
#endif
16 changes: 4 additions & 12 deletions Detectors/TPC/workflow/src/CMVToVectorSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ class CMVToVectorDevice : public o2::framework::Task
{
const auto runNumber = processing_helpers::getRunNumber(pc);
std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}};
const auto& mapper = Mapper::instance();

// open files if necessary
if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) {
Expand All @@ -95,10 +94,7 @@ class CMVToVectorDevice : public o2::framework::Task
mRawOutputFile.open(rawFileName, std::ios::binary);
}

uint32_t heartbeatOrbit = 0;
uint16_t heartbeatBC = 0;
uint32_t tfCounter = 0;
bool first = true;
bool hasErrors = false;

for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) {
Expand Down Expand Up @@ -149,7 +145,7 @@ class CMVToVectorDevice : public o2::framework::Task
LOGP(debug, "Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6}, cruID {:3}, link {:2}", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID, link);

if (std::find(mCRUs.begin(), mCRUs.end(), cruID) == mCRUs.end()) {
LOGP(warning, "CMV CRU {:3} not configured in CRUs, skipping", cruID);
// LOGP(debug, "CMV CRU {:3} not configured in CRUs, skipping", cruID);
continue;
}

Expand Down Expand Up @@ -204,7 +200,7 @@ class CMVToVectorDevice : public o2::framework::Task
}
}

hasErrors |= snapshotCMVs(pc.outputs(), tfCounter);
hasErrors |= snapshotCMVs(pc.outputs());

if (mWriteDebug || (mWriteDebugOnError && hasErrors)) {
writeDebugOutput(tfCounter);
Expand Down Expand Up @@ -274,7 +270,7 @@ class CMVToVectorDevice : public o2::framework::Task
std::string mRawOutputFileName; ///< name of the raw output file

//____________________________________________________________________________
bool snapshotCMVs(DataAllocator& output, uint32_t tfCounter)
bool snapshotCMVs(DataAllocator& output)
{
bool hasErrors = false;

Expand Down Expand Up @@ -321,12 +317,8 @@ class CMVToVectorDevice : public o2::framework::Task
//____________________________________________________________________________
void writeDebugOutput(uint32_t tfCounter)
{
const auto& mapper = Mapper::instance();

mDebugStream->GetFile()->cd();
auto& stream = (*mDebugStream) << "cmvs";
uint32_t seen = 0;
static uint32_t firstOrbit = std::numeric_limits<uint32_t>::max();

for (auto cru : mCRUs) {
if (mCMVInfos.find(cru) == mCMVInfos.end()) {
Expand Down Expand Up @@ -404,7 +396,7 @@ class CMVToVectorDevice : public o2::framework::Task
}
};

o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector<uint32_t> const& crus)
o2::framework::DataProcessorSpec getCMVToVectorSpec(std::string const& inputSpec, std::vector<uint32_t> const& crus)
{
using device = o2::tpc::CMVToVectorDevice;

Expand Down
Loading