From 9808d50fe40d40a42bf866d3cc73b35b3b307e86 Mon Sep 17 00:00:00 2001 From: tubagundem Date: Thu, 5 Feb 2026 14:01:45 +0100 Subject: [PATCH 1/3] First commit of processing common mode values in O2 --- .../TPC/include/DataFormatsTPC/CMV.h | 96 ++++ Detectors/TPC/base/include/TPCBase/RDHUtils.h | 1 + Detectors/TPC/workflow/CMakeLists.txt | 21 + .../include/TPCWorkflow/CMVToVectorSpec.h | 31 ++ .../TPC/workflow/src/CMVToVectorSpec.cxx | 439 ++++++++++++++++++ .../TPC/workflow/src/tpc-cmv-to-vector.cxx | 72 +++ 6 files changed, 660 insertions(+) create mode 100644 DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h create mode 100644 Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h create mode 100644 Detectors/TPC/workflow/src/CMVToVectorSpec.cxx create mode 100644 Detectors/TPC/workflow/src/tpc-cmv-to-vector.cxx diff --git a/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h b/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h new file mode 100644 index 0000000000000..38ebe04e1961d --- /dev/null +++ b/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h @@ -0,0 +1,96 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMV.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Common mode values data format definition + +/// The data is sent by the CRU as 96 bit words. The CMV data layout is as follows: +/// - 80-bit Header: [version:8][packetID:8][errorCode:8][magicWord:8][heartbeatOrbit:32][heartbeatBC:16] +/// - 16-bit CMV value: [CMV:16] + +#ifndef ALICEO2_DATAFORMATSTPC_CMV_H +#define ALICEO2_DATAFORMATSTPC_CMV_H + +#include + +namespace o2::tpc::cmv +{ + +static constexpr uint32_t NTimeBins = 3564; ///< number of time bins (spans 8 orbits) +static constexpr uint32_t SignificantBits = 2; ///< number of bits used for floating point precision +static constexpr float FloatConversion = 1.f / float(1 << SignificantBits); ///< conversion factor from integer representation to float + +/// Header definition of the CMVs +struct Header { + static constexpr uint8_t MagicWord = 0xDC; + union { + uint32_t word0 = 0; ///< bits 0 - 31 + struct { + uint8_t version : 8; ///< version + uint8_t packetID : 8; ///< packet id + uint8_t errorCode : 8; ///< errors + uint8_t magicWord : 8; ///< magic word + }; + }; + union { + uint32_t word1 = 0; ///< bits 32 - 63 + struct { + uint32_t heartbeatOrbit : 32; ///< first heart beat timing of the package + }; + }; + union { + uint16_t word2 = 0; ///< bits 64 - 79 + struct { + uint16_t heartbeatBC : 16; ///< first BC id of the package + }; + }; +}; + +/// CMV single data container +struct Data { + uint16_t CMV{0}; ///< 16bit ADC value + + // Raw integer accessors + uint16_t getCMV() const { return CMV; } + void setCMV(uint16_t value) { CMV = value; } + + // Float helpers using SignificantBits for fixed-point conversion + float getCMVFloat() const { return static_cast(CMV) * FloatConversion; } + void setCMVFloat(float value) { + // round to nearest representable fixed-point value + setCMV(uint32_t((value + 0.5f * FloatConversion) / FloatConversion)); + } +}; + +/// CMV full data container: one packet carries NTimeBins time bins +struct Container { + Header header; ///< CMV data header + Data data[NTimeBins]; ///< data values for given number of time bins + + // Header and data accessors + const Header& getHeader() const { return header; } + Header& getHeader() { return header; } + + const Data* getData() const { return data; } + Data* getData() { return data; } + + // Per-time-bin CMV accessors + uint16_t getCMV(uint32_t timeBin) const { return data[timeBin].getCMV(); } + void setCMV(uint32_t timeBin, uint16_t value) { data[timeBin].setCMV(value); } + + float getCMVFloat(uint32_t timeBin) const { return data[timeBin].getCMVFloat(); } + void setCMVFloat(uint32_t timeBin, float value) { data[timeBin].setCMVFloat(value); } + +}; +} // namespace o2::tpc::cmv + +#endif diff --git a/Detectors/TPC/base/include/TPCBase/RDHUtils.h b/Detectors/TPC/base/include/TPCBase/RDHUtils.h index adfd94cf6b703..ea4e1fe792a39 100644 --- a/Detectors/TPC/base/include/TPCBase/RDHUtils.h +++ b/Detectors/TPC/base/include/TPCBase/RDHUtils.h @@ -28,6 +28,7 @@ static constexpr FEEIDType UserLogicLinkID = 15; ///< virtual link ID for ZS dat static constexpr FEEIDType IDCLinkID = 20; ///< Identifier for integrated digital currents static constexpr FEEIDType ILBZSLinkID = 21; ///< Identifier for improved link-based ZS static constexpr FEEIDType DLBZSLinkID = 22; ///< Identifier for dense link-based ZS +static constexpr FEEIDType CMVLinkID = 23; ///< Identifier for common mode values static constexpr FEEIDType SACLinkID = 25; ///< Identifier for sampled analog currents /// compose feeid from cru, endpoint and link diff --git a/Detectors/TPC/workflow/CMakeLists.txt b/Detectors/TPC/workflow/CMakeLists.txt index 6930f332bfbf1..92c35f399552e 100644 --- a/Detectors/TPC/workflow/CMakeLists.txt +++ b/Detectors/TPC/workflow/CMakeLists.txt @@ -25,6 +25,7 @@ o2_add_library(TPCWorkflow src/KryptonRawFilterSpec.cxx src/OccupancyFilterSpec.cxx src/SACProcessorSpec.cxx + src/CMVToVectorSpec.cxx src/IDCToVectorSpec.cxx src/CalibdEdxSpec.cxx src/CalibratordEdxSpec.cxx @@ -289,3 +290,23 @@ o2_add_executable(pressure-temperature PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) add_subdirectory(readers) + +o2_add_executable(cmv-to-vector + COMPONENT_NAME tpc + SOURCES src/tpc-cmv-to-vector.cxx + PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +# o2_add_executable(cmv-flp +# COMPONENT_NAME tpc +# SOURCES src/tpc-flp-cmv.cxx +# PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +# o2_add_executable(cmv-distribute +# COMPONENT_NAME tpc +# SOURCES src/tpc-distribute-cmv.cxx +# PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +# o2_add_executable(cmv-producer +# COMPONENT_NAME tpc +# SOURCES src/tpc-cmv-producer.cxx +# PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) \ No newline at end of file diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h new file mode 100644 index 0000000000000..179985cea969c --- /dev/null +++ b/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h @@ -0,0 +1,31 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMVToVectorSpec.h +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Processor to convert CMVs to a vector in a CRU + +#ifndef TPC_CMVToVectorSpec_H_ +#define TPC_CMVToVectorSpec_H_ + +#include "Framework/DataProcessorSpec.h" +#include + +namespace o2::tpc +{ + +/// create a processor spec +/// convert CMV raw values to a vector in a CRU +o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector const& crus); + +} // end namespace o2::tpc + +#endif //TPC_CMVToVectorSpec_H_ diff --git a/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx b/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx new file mode 100644 index 0000000000000..03725ef7c4988 --- /dev/null +++ b/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx @@ -0,0 +1,439 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CMVToVectorSpec.cxx +/// @author Tuba Gündem, tuba.gundem@cern.ch +/// @brief Processor to convert CMVs to a vector in a CRU + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "TFile.h" +#include "DetectorsRaw/RDHUtils.h" +#include "Framework/Task.h" +#include "Framework/ControlService.h" +#include "Framework/ConfigParamRegistry.h" +#include "Framework/Logger.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/InputRecordWalker.h" +#include "Framework/DataRefUtils.h" +#include "DPLUtils/RawParser.h" +#include "Headers/DataHeader.h" +#include "Headers/DataHeaderHelpers.h" +#include "CommonUtils/TreeStreamRedirector.h" +#include "CommonUtils/NameConf.h" +#include "DataFormatsTPC/Constants.h" +#include "CommonConstants/LHCConstants.h" +#include "CCDB/BasicCCDBManager.h" + +#include "DataFormatsTPC/Defs.h" +#include "DataFormatsTPC/CMV.h" +#include "DataFormatsTPC/RawDataTypes.h" +#include "TPCBase/Utils.h" +#include "TPCBase/RDHUtils.h" +#include "TPCBase/Mapper.h" +#include "TPCWorkflow/ProcessingHelpers.h" + +using namespace o2::framework; +using o2::constants::lhc::LHCMaxBunches; +using o2::header::gDataOriginTPC; +using o2::tpc::constants::LHCBCPERTIMEBIN; +using RDHUtils = o2::raw::RDHUtils; +using RawDataType = o2::tpc::raw_data_types::Type; + +namespace o2::tpc +{ + +class CMVToVectorDevice : public o2::framework::Task +{ + public: + using FEEIDType = rdh_utils::FEEIDType; + CMVToVectorDevice(const std::vector& crus) : mCRUs(crus) {} + + void init(o2::framework::InitContext& ic) final + { + // set up ADC value filling + mWriteDebug = ic.options().get("write-debug"); + mWriteDebugOnError = ic.options().get("write-debug-on-error"); + mWriteRawDataOnError = ic.options().get("write-raw-data-on-error"); + mRawDataType = ic.options().get("raw-data-type"); + o2::framework::RawParser<>::setCheckIncompleteHBF(ic.options().get("check-incomplete-hbf")); + + mDebugStreamFileName = ic.options().get("debug-file-name").data(); + mRawOutputFileName = ic.options().get("raw-file-name").data(); + + initCMV(); + } + + void run(o2::framework::ProcessingContext& pc) final + { + const auto runNumber = processing_helpers::getRunNumber(pc); + std::vector filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}}; + const auto& mapper = Mapper::instance(); + + // open files if necessary + if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) { + const auto debugFileName = fmt::format(fmt::runtime(mDebugStreamFileName), fmt::arg("run", runNumber)); + LOGP(info, "creating debug stream {}", debugFileName); + mDebugStream = std::make_unique(debugFileName.data(), "recreate"); + } + + if (mWriteRawDataOnError && !mRawOutputFile.is_open()) { + std::string_view rawType = (mRawDataType < 2) ? "tf" : "raw"; + if (mRawDataType == 5) { + rawType = "cmv.raw"; + } + const auto rawFileName = fmt::format(fmt::runtime(mRawOutputFileName), fmt::arg("run", runNumber), fmt::arg("raw_type", rawType)); + LOGP(info, "creating raw debug file {}", rawFileName); + mRawOutputFile.open(rawFileName, std::ios::binary); + } + + uint32_t heartbeatOrbit = 0; + uint16_t heartbeatBC = 0; + uint32_t tfCounter = 0; + bool first = true; + bool hasErrors = false; + + for (auto const& ref : InputRecordWalker(pc.inputs(), filter)) { + const auto* dh = DataRefUtils::getHeader(ref); + tfCounter = dh->tfCounter; + const auto subSpecification = dh->subSpecification; + auto payloadSize = DataRefUtils::getPayloadSize(ref); + + // ---| data loop |--- + const gsl::span raw = pc.inputs().get>(ref); + try { + o2::framework::RawParser parser(raw.data(), raw.size()); + size_t lastErrorCount = 0; + + for (auto it = parser.begin(), end = parser.end(); it != end; ++it) { + const auto size = it.size(); + + if (parser.getNErrors() > lastErrorCount) { + lastErrorCount = parser.getNErrors(); + hasErrors = true; + } + + // skip empty packages (HBF open) + if (size == 0) { + continue; + } + + auto rdhPtr = reinterpret_cast(it.raw()); + const auto rdhVersion = RDHUtils::getVersion(rdhPtr); + if (!rdhPtr || rdhVersion < 6) { + throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data()); + } + + // ---| extract hardware information to do the processing |--- + const auto feeId = (FEEIDType)RDHUtils::getFEEID(*rdhPtr); + const auto link = rdh_utils::getLink(feeId); + const uint32_t cruID = rdh_utils::getCRU(feeId); + const auto detField = RDHUtils::getDetectorField(*rdhPtr); + + if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) { + continue; + } + LOGP(debug, "CMV Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6} ({:3}/-/{:2})", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID, link); + + if (std::find(mCRUs.begin(), mCRUs.end(), cruID) == mCRUs.end()) { + LOGP(error, "CMV CRU {:3} not configured in CRUs, skipping", cruID); + continue; + } + + auto& cmvVec = mCMVvectors[cruID]; + auto& infoVec = mCMVInfos[cruID]; + + assert(size == sizeof(cmv::Container)); + auto data = it.data(); + auto& cmvs = *((cmv::Container*)(data)); + const uint32_t orbit = cmvs.header.heartbeatOrbit; + const uint16_t bc = cmvs.header.heartbeatBC; + + // record packet meta and append its CMV vector (3564 TB) + infoVec.emplace_back(orbit, bc); + cmvVec.reserve(cmvVec.size() + cmv::NTimeBins); + for (uint32_t tb = 0; tb < cmv::NTimeBins; ++tb) { + cmvVec.push_back(cmvs.getCMVFloat(tb)); + } + } + } catch (const std::exception& e) { + // error message throtteling + using namespace std::literals::chrono_literals; + static std::unordered_map nErrorPerSubspec; + static std::chrono::time_point lastReport = std::chrono::steady_clock::now(); + const auto now = std::chrono::steady_clock::now(); + static size_t reportedErrors = 0; + const size_t MAXERRORS = 10; + const auto sleepTime = 10min; + ++nErrorPerSubspec[subSpecification]; + + if ((now - lastReport) < sleepTime) { + if (reportedErrors < MAXERRORS) { + ++reportedErrors; + std::string sleepInfo; + if (reportedErrors == MAXERRORS) { + sleepInfo = fmt::format(", maximum error count ({}) reached, not reporting for the next {}", MAXERRORS, sleepTime); + } + LOGP(alarm, "EXCEPTION in processRawData: {} -> skipping part:{}/{} of spec:{}/{}/{}, size:{}, error count for subspec: {}{}", e.what(), dh->splitPayloadIndex, dh->splitPayloadParts, + dh->dataOrigin, dh->dataDescription, subSpecification, payloadSize, nErrorPerSubspec.at(subSpecification), sleepInfo); + lastReport = now; + } + } else { + lastReport = now; + reportedErrors = 0; + } + continue; + } + } + + hasErrors |= snapshotCMVs(pc.outputs(), tfCounter); + + if (mWriteDebug || (mWriteDebugOnError && hasErrors)) { + writeDebugOutput(tfCounter); + } + + if (mWriteRawDataOnError && hasErrors) { + writeRawData(pc.inputs()); + } + + // clear output + initCMV(); + } + + void closeFiles() + { + LOGP(info, "closeFiles"); + + if (mDebugStream) { + // set some default aliases + auto& stream = (*mDebugStream) << "cmvs"; + auto& tree = stream.getTree(); + tree.SetAlias("sector", "int(cru/10)"); + mDebugStream->Close(); + mDebugStream.reset(nullptr); + mRawOutputFile.close(); + } + } + + void stop() final + { + LOGP(info, "stop"); + closeFiles(); + } + + void endOfStream(o2::framework::EndOfStreamContext& ec) final + { + LOGP(info, "endOfStream"); + // ec.services().get().readyToQuit(QuitRequest::Me); + closeFiles(); + } + + private: + /// CMV information for each cru + struct CMVInfo { + CMVInfo() = default; + CMVInfo(const CMVInfo&) = default; + CMVInfo(uint32_t orbit, uint16_t bc) : heartbeatOrbit(orbit), heartbeatBC(bc) {} + + uint32_t heartbeatOrbit{0}; + uint16_t heartbeatBC{0}; + + bool operator==(const uint32_t orbit) const { return (heartbeatOrbit == orbit); } + bool operator==(const CMVInfo& inf) const { return (inf.heartbeatOrbit == heartbeatOrbit) && (inf.heartbeatBC == heartbeatBC); } + bool matches(uint32_t orbit, int16_t bc) const { return ((heartbeatOrbit == orbit) && (heartbeatBC == bc)); } + }; + + int mRawDataType{0}; ///< type of raw data to dump in case of errors + bool mWriteDebug{false}; ///< write a debug output + bool mWriteDebugOnError{false}; ///< write a debug output in case of errors + bool mWriteRawDataOnError{false}; ///< write raw data in case of errors + std::vector mCRUs; ///< CRUs expected for this device + std::unordered_map> mCMVvectors; ///< decoded CMVs per cru over all CMV packets in the TF + std::unordered_map> mCMVInfos; ///< CMV packet information within the TF + std::string mDebugStreamFileName; ///< name of the debug stream output file + std::unique_ptr mDebugStream; ///< debug output streamer + std::ofstream mRawOutputFile; ///< raw output file + std::string mRawOutputFileName; ///< name of the raw output file + + //____________________________________________________________________________ + bool snapshotCMVs(DataAllocator& output, uint32_t tfCounter) + { + LOGP(debug, "snapshotCMVs"); + + bool hasErrors = false; + + // send data per CRU with its own orbit/BC vector + for (auto& [cru, cmvVec] : mCMVvectors) { + const header::DataHeader::SubSpecificationType subSpec{cru << 7}; + const auto& infVec = mCMVInfos[cru]; + + if (infVec.size() != 4) { + LOGP(warning, "CMV CRU {:3}: expected 4 packets per TF, got {}", cru, infVec.size()); + hasErrors = true; + } + if (cmvVec.size() != cmv::NTimeBins * infVec.size()) { + LOGP(warning, "CMV CRU {:3}: vector size {} does not match expected {}", cru, cmvVec.size(), cmv::NTimeBins * infVec.size()); + hasErrors = true; + } + + std::vector orbitBCInfo; + orbitBCInfo.reserve(infVec.size()); + for (const auto& inf : infVec) { + orbitBCInfo.emplace_back((uint64_t(inf.heartbeatOrbit) << 32) + uint64_t(inf.heartbeatBC)); + } + + LOGP(debug, "Sending CMVs for CRU {} of size {} ({} packets)", cru, cmvVec.size(), infVec.size()); + output.snapshot(Output{gDataOriginTPC, "CMVVECTOR", subSpec}, cmvVec); + output.snapshot(Output{gDataOriginTPC, "CMVORBITS", subSpec}, orbitBCInfo); + } + + return hasErrors; + } + + //____________________________________________________________________________ + void initCMV() + { + for (const auto cruID : mCRUs) { + auto& cmvVec = mCMVvectors[cruID]; + cmvVec.clear(); + + auto& infosCRU = mCMVInfos[cruID]; + infosCRU.clear(); + } + } + + //____________________________________________________________________________ + void writeDebugOutput(uint32_t tfCounter) + { + const auto& mapper = Mapper::instance(); + + mDebugStream->GetFile()->cd(); + auto& stream = (*mDebugStream) << "cmvs"; + uint32_t seen = 0; + static uint32_t firstOrbit = std::numeric_limits::max(); + + for (auto cru : mCRUs) { + if (mCMVInfos.find(cru) == mCMVInfos.end()) { + continue; + } + + auto& infos = mCMVInfos[cru]; + auto& cmvVec = mCMVvectors[cru]; + + stream << "cru=" << cru + << "tfCounter=" << tfCounter + << "nCMVs=" << cmvVec.size() + << "cmvs=" << cmvVec + << "\n"; + } + + } + + void writeRawData(InputRecord& inputs) + { + if (!mRawOutputFile.is_open()) { + return; + } + + using DataHeader = o2::header::DataHeader; + + std::vector filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, "RAWDATA"}, Lifetime::Timeframe}}; + for (auto const& ref : InputRecordWalker(inputs, filter)) { + auto dh = DataRefUtils::getHeader(ref); + // LOGP(info, "write header: {}/{}/{}, payload size: {} / {}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dh->payloadSize, ref.payloadSize); + if (((mRawDataType == 1) || (mRawDataType == 3)) && (dh->payloadSize == 2 * sizeof(o2::header::RAWDataHeader))) { + continue; + } + + if (mRawDataType < 2) { + mRawOutputFile.write(ref.header, sizeof(DataHeader)); + } + if (mRawDataType < 5) { + mRawOutputFile.write(ref.payload, ref.payloadSize); + } + + if (mRawDataType == 5) { + const gsl::span raw = inputs.get>(ref); + try { + o2::framework::RawParser parser(raw.data(), raw.size()); + for (auto it = parser.begin(), end = parser.end(); it != end; ++it) { + const auto size = it.size(); + // skip empty packages (HBF open) + if (size == 0) { + continue; + } + + auto rdhPtr = reinterpret_cast(it.raw()); + const auto rdhVersion = RDHUtils::getVersion(rdhPtr); + if (!rdhPtr || rdhVersion < 6) { + throw std::runtime_error(fmt::format("could not get RDH from packet, or version {} < 6", rdhVersion).data()); + } + + // ---| extract hardware information to do the processing |--- + const auto feeId = (FEEIDType)RDHUtils::getFEEID(*rdhPtr); + const auto link = rdh_utils::getLink(feeId); + const auto detField = RDHUtils::getDetectorField(*rdhPtr); + + // only select CMVs + if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) { + continue; + } + + // write out raw data + mRawOutputFile.write((const char*)it.raw(), RDHUtils::getMemorySize(rdhPtr)); + } + } catch (...) { + } + } + } + } +}; + +o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, std::vector const& crus) +{ + using device = o2::tpc::CMVToVectorDevice; + + std::vector outputs; + for (const uint32_t cru : crus) { + const header::DataHeader::SubSpecificationType subSpec{cru << 7}; + outputs.emplace_back(gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe); + outputs.emplace_back(gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe); + } + + return DataProcessorSpec{ + fmt::format("tpc-cmv-to-vector"), + select(inputSpec.data()), + outputs, + AlgorithmSpec{adaptFromTask(crus)}, + Options{ + {"write-debug", VariantType::Bool, false, {"write a debug output tree"}}, + {"write-debug-on-error", VariantType::Bool, false, {"write a debug output tree in case errors occurred"}}, + {"debug-file-name", VariantType::String, "/tmp/cmv_vector_debug.{run}.root", {"name of the debug output file"}}, + {"write-raw-data-on-error", VariantType::Bool, false, {"dump raw data in case errors occurred"}}, + {"raw-file-name", VariantType::String, "/tmp/cmv_debug.{run}.{raw_type}", {"name of the raw output file"}}, + {"raw-data-type", VariantType::Int, 0, {"Which raw data to dump: 0-full TPC with DH, 1-full TPC with DH skip empty, 2-full TPC no DH, 3-full TPC no DH skip empty, 4-IDC raw only 5-CMV raw only"}}, + {"check-incomplete-hbf", VariantType::Bool, false, {"false: don't chck; true: check and report"}}, + } // end Options + }; // end DataProcessorSpec +} +} // namespace o2::tpc \ No newline at end of file diff --git a/Detectors/TPC/workflow/src/tpc-cmv-to-vector.cxx b/Detectors/TPC/workflow/src/tpc-cmv-to-vector.cxx new file mode 100644 index 0000000000000..80ab0ce2f96b8 --- /dev/null +++ b/Detectors/TPC/workflow/src/tpc-cmv-to-vector.cxx @@ -0,0 +1,72 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include +#include + +#include "Algorithm/RangeTokenizer.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ControlService.h" +#include "Framework/ConfigParamSpec.h" +#include "Framework/CompletionPolicy.h" +#include "Framework/CompletionPolicyHelpers.h" +#include "CommonUtils/ConfigurableParam.h" +#include "TPCBase/CRU.h" +#include "TPCWorkflow/CMVToVectorSpec.h" + +using namespace o2::framework; +using namespace o2::tpc; + +// customize the completion policy +void customize(std::vector& policies) +{ + using o2::framework::CompletionPolicy; + policies.push_back(CompletionPolicyHelpers::defineByName("tpc-cmv-to-vector", CompletionPolicy::CompletionOp::Consume)); +} + +// we need to add workflow options before including Framework/runDataProcessing +void customize(std::vector& workflowOptions) +{ + std::string crusDefault = "0-" + std::to_string(CRU::MaxCRU - 1); + + std::vector options{ + {"input-spec", VariantType::String, "A:TPC/RAWDATA", {"selection string input specs"}}, + {"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings (e.g.: 'TPCCalibPedestal.FirstTimeBin=10;...')"}}, + {"configFile", VariantType::String, "", {"configuration file for configurable parameters"}}, + {"crus", VariantType::String, crusDefault.c_str(), {"List of TPC crus, comma separated ranges, e.g. 0-3,7,9-15"}}, + }; + + std::swap(workflowOptions, options); +} + +#include "Framework/runDataProcessing.h" + +WorkflowSpec defineDataProcessing(ConfigContext const& config) +{ + + using namespace o2::tpc; + + // set up configuration + o2::conf::ConfigurableParam::updateFromFile(config.options().get("configFile")); + o2::conf::ConfigurableParam::updateFromString(config.options().get("configKeyValues")); + o2::conf::ConfigurableParam::writeINI("o2tpccmv_configuration.ini"); + + const std::string inputSpec = config.options().get("input-spec"); + + const auto crus = o2::RangeTokenizer::tokenize(config.options().get("crus")); + + WorkflowSpec workflow; + + workflow.emplace_back(getCMVToVectorSpec(inputSpec, crus)); + + return workflow; +} From 0ac4575565d7a5f62f1cc415bd22c6015ec4519d Mon Sep 17 00:00:00 2001 From: tubagundem Date: Thu, 5 Mar 2026 15:38:06 +0100 Subject: [PATCH 2/3] Updated CMV header and fixed payload size comparison --- .../TPC/include/DataFormatsTPC/CMV.h | 69 +++++++++++++------ Detectors/TPC/workflow/CMakeLists.txt | 6 +- .../TPC/workflow/src/CMVToVectorSpec.cxx | 50 ++++++++------ 3 files changed, 78 insertions(+), 47 deletions(-) diff --git a/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h b/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h index 38ebe04e1961d..566ed521a32bc 100644 --- a/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h +++ b/DataFormats/Detectors/TPC/include/DataFormatsTPC/CMV.h @@ -13,8 +13,8 @@ /// @author Tuba Gündem, tuba.gundem@cern.ch /// @brief Common mode values data format definition -/// The data is sent by the CRU as 96 bit words. The CMV data layout is as follows: -/// - 80-bit Header: [version:8][packetID:8][errorCode:8][magicWord:8][heartbeatOrbit:32][heartbeatBC:16] +/// The data is sent by the CRU as 256+16 bit words. The CMV data layout is as follows: +/// - 256-bit Header: [version:8][packetID:8][errorCode:8][magicWord:8][heartbeatOrbit:32][heartbeatBC:16][padding:176] /// - 16-bit CMV value: [CMV:16] #ifndef ALICEO2_DATAFORMATSTPC_CMV_H @@ -25,47 +25,72 @@ namespace o2::tpc::cmv { -static constexpr uint32_t NTimeBins = 3564; ///< number of time bins (spans 8 orbits) -static constexpr uint32_t SignificantBits = 2; ///< number of bits used for floating point precision -static constexpr float FloatConversion = 1.f / float(1 << SignificantBits); ///< conversion factor from integer representation to float +static constexpr uint32_t NTimeBins = 3564; ///< number of time bins (spans 8 orbits) +static constexpr uint32_t SignificantBits = 2; ///< number of bits used for floating point precision +static constexpr float FloatConversion = 1.f / float(1 << SignificantBits); ///< conversion factor from integer representation to float /// Header definition of the CMVs struct Header { static constexpr uint8_t MagicWord = 0xDC; union { - uint32_t word0 = 0; ///< bits 0 - 31 + uint32_t word0 = 0; ///< bits 0 - 31 struct { - uint8_t version : 8; ///< version - uint8_t packetID : 8; ///< packet id - uint8_t errorCode : 8; ///< errors - uint8_t magicWord : 8; ///< magic word + uint8_t version : 8; ///< version + uint8_t packetID : 8; ///< packet id + uint8_t errorCode : 8; ///< errors + uint8_t magicWord : 8; ///< magic word }; }; union { - uint32_t word1 = 0; ///< bits 32 - 63 + uint32_t word1 = 0; ///< bits 32 - 63 struct { - uint32_t heartbeatOrbit : 32; ///< first heart beat timing of the package + uint32_t heartbeatOrbit : 32; ///< first heart beat timing of the package }; }; union { - uint16_t word2 = 0; ///< bits 64 - 79 + uint16_t word2 = 0; ///< bits 64 - 79 struct { - uint16_t heartbeatBC : 16; ///< first BC id of the package + uint16_t heartbeatBC : 16; ///< first BC id of the package + }; + }; + union { + uint16_t word3 = 0; ///< bits 80 - 95 + struct { + uint16_t unused1 : 16; ///< reserved + }; + }; + union { + uint32_t word4 = 0; ///< bits 96 - 127 + struct { + uint32_t unused2 : 32; ///< reserved + }; + }; + union { + uint64_t word5 = 0; ///< bits 128 - 191 + struct { + uint64_t unused3 : 64; ///< reserved + }; + }; + union { + uint64_t word6 = 0; ///< bits 192 - 255 + struct { + uint64_t unused4 : 64; ///< reserved }; }; }; /// CMV single data container struct Data { - uint16_t CMV{0}; ///< 16bit ADC value - + uint16_t CMV{0}; ///< 16bit ADC value + // Raw integer accessors uint16_t getCMV() const { return CMV; } void setCMV(uint16_t value) { CMV = value; } - + // Float helpers using SignificantBits for fixed-point conversion float getCMVFloat() const { return static_cast(CMV) * FloatConversion; } - void setCMVFloat(float value) { + void setCMVFloat(float value) + { // round to nearest representable fixed-point value setCMV(uint32_t((value + 0.5f * FloatConversion) / FloatConversion)); } @@ -73,8 +98,8 @@ struct Data { /// CMV full data container: one packet carries NTimeBins time bins struct Container { - Header header; ///< CMV data header - Data data[NTimeBins]; ///< data values for given number of time bins + Header header; ///< CMV data header + Data data[NTimeBins]; ///< data values for given number of time bins // Header and data accessors const Header& getHeader() const { return header; } @@ -89,8 +114,8 @@ struct Container { float getCMVFloat(uint32_t timeBin) const { return data[timeBin].getCMVFloat(); } void setCMVFloat(uint32_t timeBin, float value) { data[timeBin].setCMVFloat(value); } - }; + } // namespace o2::tpc::cmv -#endif +#endif \ No newline at end of file diff --git a/Detectors/TPC/workflow/CMakeLists.txt b/Detectors/TPC/workflow/CMakeLists.txt index 92c35f399552e..12654dce9568c 100644 --- a/Detectors/TPC/workflow/CMakeLists.txt +++ b/Detectors/TPC/workflow/CMakeLists.txt @@ -289,8 +289,6 @@ o2_add_executable(pressure-temperature SOURCES src/tpc-pressure-temperature.cxx PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) -add_subdirectory(readers) - o2_add_executable(cmv-to-vector COMPONENT_NAME tpc SOURCES src/tpc-cmv-to-vector.cxx @@ -309,4 +307,6 @@ o2_add_executable(cmv-to-vector # o2_add_executable(cmv-producer # COMPONENT_NAME tpc # SOURCES src/tpc-cmv-producer.cxx -# PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) \ No newline at end of file +# PUBLIC_LINK_LIBRARIES O2::TPCWorkflow) + +add_subdirectory(readers) \ No newline at end of file diff --git a/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx b/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx index 03725ef7c4988..0ab16bb83a629 100644 --- a/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx +++ b/Detectors/TPC/workflow/src/CMVToVectorSpec.cxx @@ -93,7 +93,7 @@ class CMVToVectorDevice : public o2::framework::Task // open files if necessary if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) { const auto debugFileName = fmt::format(fmt::runtime(mDebugStreamFileName), fmt::arg("run", runNumber)); - LOGP(info, "creating debug stream {}", debugFileName); + LOGP(info, "Creating debug stream {}", debugFileName); mDebugStream = std::make_unique(debugFileName.data(), "recreate"); } @@ -103,7 +103,7 @@ class CMVToVectorDevice : public o2::framework::Task rawType = "cmv.raw"; } const auto rawFileName = fmt::format(fmt::runtime(mRawOutputFileName), fmt::arg("run", runNumber), fmt::arg("raw_type", rawType)); - LOGP(info, "creating raw debug file {}", rawFileName); + LOGP(info, "Creating raw debug file {}", rawFileName); mRawOutputFile.open(rawFileName, std::ios::binary); } @@ -118,6 +118,7 @@ class CMVToVectorDevice : public o2::framework::Task tfCounter = dh->tfCounter; const auto subSpecification = dh->subSpecification; auto payloadSize = DataRefUtils::getPayloadSize(ref); + // LOGP(info, "Processing TF {}, subSpecification {}, payloadSize {}", tfCounter, subSpecification, payloadSize); // ---| data loop |--- const gsl::span raw = pc.inputs().get>(ref); @@ -150,10 +151,14 @@ class CMVToVectorDevice : public o2::framework::Task const uint32_t cruID = rdh_utils::getCRU(feeId); const auto detField = RDHUtils::getDetectorField(*rdhPtr); + // LOGP(info, "Detected CMV packet: CRU {}, link {}, feeId {}", cruID, link, feeId); + if ((detField != (decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) { + // LOGP(debug, "Skipping packet: detField {}, (expected RawDataType {}), link {}, (expected CMVLinkID {})", detField, (decltype(detField))RawDataType::CMV, link, rdh_utils::CMVLinkID); continue; } - LOGP(debug, "CMV Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6} ({:3}/-/{:2})", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID, link); + + LOGP(debug, "Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6}, cruID {:3}, link {:2}", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID, link); if (std::find(mCRUs.begin(), mCRUs.end(), cruID) == mCRUs.end()) { LOGP(error, "CMV CRU {:3} not configured in CRUs, skipping", cruID); @@ -163,7 +168,10 @@ class CMVToVectorDevice : public o2::framework::Task auto& cmvVec = mCMVvectors[cruID]; auto& infoVec = mCMVInfos[cruID]; - assert(size == sizeof(cmv::Container)); + if (size != sizeof(cmv::Container)) { + LOGP(error, "CMV packet size mismatch: got {} bytes, expected {} bytes (sizeof cmv::Container). Skipping package.", size, sizeof(cmv::Container)); + return; + } auto data = it.data(); auto& cmvs = *((cmv::Container*)(data)); const uint32_t orbit = cmvs.header.heartbeatOrbit; @@ -174,6 +182,7 @@ class CMVToVectorDevice : public o2::framework::Task cmvVec.reserve(cmvVec.size() + cmv::NTimeBins); for (uint32_t tb = 0; tb < cmv::NTimeBins; ++tb) { cmvVec.push_back(cmvs.getCMVFloat(tb)); + // LOGP(info, "Appended CMV {} for timebin {}, CRU {}, orbit {}, bc {}", cmvs.getCMVFloat(tb), tb, cruID, orbit, bc); } } } catch (const std::exception& e) { @@ -263,23 +272,21 @@ class CMVToVectorDevice : public o2::framework::Task bool matches(uint32_t orbit, int16_t bc) const { return ((heartbeatOrbit == orbit) && (heartbeatBC == bc)); } }; - int mRawDataType{0}; ///< type of raw data to dump in case of errors - bool mWriteDebug{false}; ///< write a debug output - bool mWriteDebugOnError{false}; ///< write a debug output in case of errors - bool mWriteRawDataOnError{false}; ///< write raw data in case of errors - std::vector mCRUs; ///< CRUs expected for this device - std::unordered_map> mCMVvectors; ///< decoded CMVs per cru over all CMV packets in the TF - std::unordered_map> mCMVInfos; ///< CMV packet information within the TF - std::string mDebugStreamFileName; ///< name of the debug stream output file - std::unique_ptr mDebugStream; ///< debug output streamer - std::ofstream mRawOutputFile; ///< raw output file - std::string mRawOutputFileName; ///< name of the raw output file + int mRawDataType{0}; ///< type of raw data to dump in case of errors + bool mWriteDebug{false}; ///< write a debug output + bool mWriteDebugOnError{false}; ///< write a debug output in case of errors + bool mWriteRawDataOnError{false}; ///< write raw data in case of errors + std::vector mCRUs; ///< CRUs expected for this device + std::unordered_map> mCMVvectors; ///< decoded CMVs per cru over all CMV packets in the TF + std::unordered_map> mCMVInfos; ///< CMV packet information within the TF + std::string mDebugStreamFileName; ///< name of the debug stream output file + std::unique_ptr mDebugStream; ///< debug output streamer + std::ofstream mRawOutputFile; ///< raw output file + std::string mRawOutputFileName; ///< name of the raw output file //____________________________________________________________________________ bool snapshotCMVs(DataAllocator& output, uint32_t tfCounter) { - LOGP(debug, "snapshotCMVs"); - bool hasErrors = false; // send data per CRU with its own orbit/BC vector @@ -288,11 +295,11 @@ class CMVToVectorDevice : public o2::framework::Task const auto& infVec = mCMVInfos[cru]; if (infVec.size() != 4) { - LOGP(warning, "CMV CRU {:3}: expected 4 packets per TF, got {}", cru, infVec.size()); + LOGP(warning, "CRU {:3}: expected 4 packets per TF, got {}", cru, infVec.size()); hasErrors = true; } if (cmvVec.size() != cmv::NTimeBins * infVec.size()) { - LOGP(warning, "CMV CRU {:3}: vector size {} does not match expected {}", cru, cmvVec.size(), cmv::NTimeBins * infVec.size()); + LOGP(warning, "CRU {:3}: vector size {} does not match expected {}", cru, cmvVec.size(), cmv::NTimeBins * infVec.size()); hasErrors = true; } @@ -336,17 +343,16 @@ class CMVToVectorDevice : public o2::framework::Task if (mCMVInfos.find(cru) == mCMVInfos.end()) { continue; } - + auto& infos = mCMVInfos[cru]; auto& cmvVec = mCMVvectors[cru]; - + stream << "cru=" << cru << "tfCounter=" << tfCounter << "nCMVs=" << cmvVec.size() << "cmvs=" << cmvVec << "\n"; } - } void writeRawData(InputRecord& inputs) From e669f512392da41dabfe31fc6011963b0206c941 Mon Sep 17 00:00:00 2001 From: tubagundem Date: Thu, 5 Mar 2026 15:49:23 +0100 Subject: [PATCH 3/3] Fixed formatting --- Detectors/TPC/base/include/TPCBase/RDHUtils.h | 2 +- Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Detectors/TPC/base/include/TPCBase/RDHUtils.h b/Detectors/TPC/base/include/TPCBase/RDHUtils.h index ea4e1fe792a39..71b5d16b85702 100644 --- a/Detectors/TPC/base/include/TPCBase/RDHUtils.h +++ b/Detectors/TPC/base/include/TPCBase/RDHUtils.h @@ -13,7 +13,7 @@ #define AliceO2_TPC_RDHUtils_H #include "DetectorsRaw/RDHUtils.h" -//#include "Headers/RAWDataHeader.h" +// #include "Headers/RAWDataHeader.h" namespace o2 { diff --git a/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h b/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h index 179985cea969c..fb27dedabd9c4 100644 --- a/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h +++ b/Detectors/TPC/workflow/include/TPCWorkflow/CMVToVectorSpec.h @@ -28,4 +28,4 @@ o2::framework::DataProcessorSpec getCMVToVectorSpec(const std::string inputSpec, } // end namespace o2::tpc -#endif //TPC_CMVToVectorSpec_H_ +#endif // TPC_CMVToVectorSpec_H_