NRP Core  1.4.1
output_node.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef OUTPUT_ENGINE_NODE_H
23 #define OUTPUT_ENGINE_NODE_H
24 
25 #include <boost/python.hpp>
26 #include <iostream>
27 #include <mutex>
28 
30 
33 
37 class OutputEngineNode : public OutputNode<DataPackInterface*> {
38 public:
39 
41 
45  OutputEngineNode(const std::string &id, const std::string &engineName,
46  bool publishFromCache = false,
47  unsigned int computePeriod = 1) :
49  _engineName(engineName)
50  { }
51 
52  std::string typeStr() const override
53  { return "ToEngine"; }
54 
59  {
60  std::lock_guard<std::mutex> lock(_dataMutex);
61 
62  datapacks_set_t dataPacks;
63  for(auto &[id, dataPackRawPtr] : _dataStore)
64  dataPacks.insert(DataPackInterfaceConstSharedPtr(dataPackRawPtr));
65  _dataStore.clear();
66 
67  return dataPacks;
68  }
69 
70 protected:
71 
72  void sendSingleMsg(const std::string& /*id*/, const DataPackInterfacePtr * data) override
73  {
74  std::lock_guard<std::mutex> lock(_dataMutex);
75 
76  // OutputNode already checks for nullptr, but since we have a double pointer here an extra check is needed
77  if(!(*data))
78  return;
79  else if(_engineName != (*data)->engineName())
80  NRPLogger::info("In OutputEngineNode '" + this->_engineName + "'. Received datapack with Id '" + (*data)->name() +
81  "' linked to Engine '" + (*data)->engineName() + "'. This node only accept datapacks linked to Engine '" +
82  this->_engineName +"'. Please check your graph configuration ");
83  else
84  // Datapacks are copied to preserve graph integrity and to ensure that the pointer is valid in next graph cycles
85  _dataStore[(*data)->name()] = (*data)->clone();
86  }
87 
88  void sendBatchMsg(const std::string& /*id*/, const std::vector<const DataPackInterfacePtr*>& /*data*/) override
89  {
90  throw NRPException::logCreate("BATCH update policy is not supported in OutputEngineNode");
91  }
92 
93 private:
94 
96  std::string _engineName;
98  std::mutex _dataMutex;
100  std::map<std::string, DataPackInterface*> _dataStore;
101 
102 };
103 
104 
105 class OutputEngineEdge : public SimpleOutputEdge<DataPackInterface*, OutputEngineNode> {
106 
107 public:
108 
109  OutputEngineEdge(const std::string &keyword, const std::string &address) :
110  SimpleOutputEdge<DataPackInterface*, OutputEngineNode>(keyword, ComputationalNode::parseNodeAddress(address, false).first+"_output",
111  "anonymous_port"+std::to_string(port_n++), false, 1),
112  _engineName(ComputationalNode::parseNodeAddress(address, false).first)
113  {}
114 
115 protected:
116 
118  { return new OutputEngineNode(this->_id, _engineName); }
119 
120 private:
121 
122  std::string _engineName;
123 
124  static size_t port_n;
125 };
126 
127 
128 #endif //OUTPUT_ENGINE_NODE_H
datapacks_set_t
std::set< std::shared_ptr< const DataPackInterface >, DataPackPointerComparator > datapacks_set_t
Definition: datapack_interface.h:219
DataPackInterfaceConstSharedPtr
DataPackInterface::const_shared_ptr DataPackInterfaceConstSharedPtr
Definition: datapack_interface.h:180
OutputEngineNode::OutputEngineNode
OutputEngineNode(const std::string &id, const std::string &engineName, bool publishFromCache=false, unsigned int computePeriod=1)
Definition: output_node.h:45
datapack_interface.h
OutputEdge::_id
std::string _id
Definition: output_edge.h:170
OutputNodePolicies
Definition: computational_node_policies.h:47
OutputNode< DataPackInterface * >::publishFromCache
bool publishFromCache()
Definition: output_node.h:109
output_node.h
ComputationalNode::id
const std::string & id() const
Returns the node 'id'.
Definition: computational_node.h:57
OutputEngineEdge
Definition: output_node.h:105
NRPLogger::info
static void info(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for info level.
Definition: nrp_logger.h:138
OutputEngineEdge::makeNewNode
OutputEngineNode * makeNewNode() override
Definition: output_node.h:117
OutputEdge
Helper class used to implement Python output edge decorators.
Definition: output_edge.h:36
OutputNode
Implementation of an output node in the computation graph.
Definition: output_node.h:38
OutputEngineNode::sendSingleMsg
void sendSingleMsg(const std::string &, const DataPackInterfacePtr *data) override
Definition: output_node.h:72
output_edge.h
OutputNodePolicies::SERIES
@ SERIES
Definition: computational_node_policies.h:51
NRPException::logCreate
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
OutputEngineNode::getDataPacks
datapacks_set_t getDataPacks()
Definition: output_node.h:58
DataPackInterface
Interface to datapacks.
Definition: datapack_interface.h:89
OutputEngineNode::sendBatchMsg
void sendBatchMsg(const std::string &, const std::vector< const DataPackInterfacePtr * > &) override
Definition: output_node.h:88
OutputNodePolicies::PublishFormatPolicy
PublishFormatPolicy
Defines how output nodes send stored msgs.
Definition: computational_node_policies.h:50
ComputationalNode
Base class implementing a node in the computational graph.
Definition: computational_node.h:31
OutputEngineNode::typeStr
std::string typeStr() const override
Returns the node 'type' as a string.
Definition: output_node.h:52
OutputEngineNode
Output node used to connect the computational graph with an EngineClient.
Definition: output_node.h:37
OutputEngineEdge::OutputEngineEdge
OutputEngineEdge(const std::string &keyword, const std::string &address)
Definition: output_node.h:109