NRP Core  1.4.1
output_node.h
Go to the documentation of this file.
1 /* * NRP Core - Backend infrastructure to synchronize simulations
2  *
3  * Copyright 2020-2023 NRP Team
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * This project has received funding from the European Union’s Horizon 2020
18  * Framework Programme for Research and Innovation under the Specific Grant
19  * Agreement No. 945539 (Human Brain Project SGA3).
20  */
21 
22 #ifndef OUTPUT_NODE_H
23 #define OUTPUT_NODE_H
24 
25 #include <map>
26 
30 
37 template <class DATA>
38 class OutputNode : public ComputationalNode {
39 public:
40 
44  OutputNode(const std::string &id,
46  bool publishFromCache = false,
47  int maxPortConnections = 0,
48  unsigned int computePeriod = 1) :
50  _publishFormatPolicy(publishFormatPolicy),
52  _maxPortConnections(maxPortConnections),
53  _computePeriod(computePeriod)
54  {
55  // If the node is set to publish 'ALWAYS' or period is != 1, only one connection per port is allowed,
56  // otherwise there is the risk of invalid pointers or msg loss
58  && _maxPortConnections != 1) {
59  NRPLogger::debug("In node \"" + id + "\". When policy publish 'FROM_CACHE' is used and/or period is not set to 1, "
60  "the node can only accept one connection per port. 'maxPortConnections'"
61  "was set to " + std::to_string(_maxPortConnections) + ", setting it to 1");
63  }
64  }
65 
69  template <class T_IN>
70  InputPort<T_IN, DATA>* getOrRegisterInput(const std::string& id)
71  {
72  InputPort<T_IN, DATA>* port = nullptr;
73 
74  if(_isConfigured) {
75  std::string s = "Node '" + this->id() + "' is already configured. Ignoring request to get or register port '" + id + "'.";
76  NRPLogger::info(s);
77  }
78  else if(_inputPorts.count(id)) {
79  port = dynamic_cast<InputPort<T_IN, DATA>*>(_inputPorts.at(id).get());
80  if(!port)
81  throw NRPException::logCreate("In Output node '" + this->id() + "', attempt to register port '" + id +
82  "' under a different data type");
83  }
84  else {
85  using std::placeholders::_1;
86  std::function<void(const DATA *)> f = std::bind(&OutputNode::storeMsg, this, id, _1);
88  _inputPorts.emplace(id, std::shared_ptr<Port>(port));
89  }
90 
91 
92  return port;
93  }
94 
95  unsigned int getComputePeriod()
96  { return _computePeriod; }
97 
98  void setComputePeriod(unsigned int computePeriod)
99  {
100  if(_maxPortConnections != 1 && computePeriod != 1) {
101  NRPLogger::info("In node \"" + this->id() + "\". Request to set compute period to " + std::to_string(computePeriod) + "rejected "
102  "because the node node ports can receive more than one connections");
103  return;
104  }
105 
106  _computePeriod = computePeriod;
107  }
108 
110  { return _publishFromCache; }
111 
112  virtual bool doCompute() const override final
113  { return ComputationalNode::doCompute() || (_computePeriod != 0 && (_nLoop-1) % _computePeriod == 0); }
114 
115 protected:
116 
117  void configure() override
118  {
119  // Allocate space for _storedMsgs. It is assumed that no new ports are registered nor existing ones are subscribed after this call.
120  // To enforce the latter ports can't be accessed or registered after the node is configured.
121  for(auto& [port_id, port]: _inputPorts)
122  _storedMsgs[port_id].reserve(port->subscriptionsSize());
123 
124  _isConfigured = true;
125  }
126 
127  virtual void compute() override final
128  {
129  if(doCompute()) {
130  // TODO: this loop could be possibly parallelized
131  for (auto &[id, v]: _storedMsgs)
132  sendMsgs(id);
133  }
134  }
135 
136  void graphCycleStartCB() override final
137  { _nLoop++; }
138 
142  void storeMsg(const std::string& id,const DATA* data)
143  {
144  if(!data)
145  return;
146  // If _maxPortConnections == 1 always override the last msg, which should be the only stored msg
147  else if(_maxPortConnections == 1 &&
148  _storedMsgs[id].size())
149  _storedMsgs[id][_storedMsgs[id].size()-1] = data;
150  else
151  _storedMsgs[id].push_back(data);
152  }
153 
157  void sendMsgs(const std::string& id)
158  {
160  for (auto m : _storedMsgs[id])
161  sendSingleMsg(id, m);
162  else
163  sendBatchMsg(id, _storedMsgs[id]);
164 
165  // Depending on publish policy, clear cached msgs
166  if(!_publishFromCache)
167  _storedMsgs[id].clear();
168  }
169 
173  virtual void sendSingleMsg(const std::string& id, const DATA* data) = 0;
174 
178  virtual void sendBatchMsg(const std::string& id, const std::vector<const DATA*>& data) = 0;
179 
181  std::map< std::string, std::shared_ptr<Port> > _inputPorts;
183  std::map< std::string, std::vector<const DATA*>> _storedMsgs;
191  bool _isConfigured = false;
192 
198  unsigned int _computePeriod;
200  unsigned int _nLoop = 0;
201 
204 };
205 
206 
207 #endif //OUTPUT_NODE_H
OutputNode::_maxPortConnections
int _maxPortConnections
Maximum number of subscriptions of ports in this node.
Definition: output_node.h:189
OutputNode::_isConfigured
bool _isConfigured
true if the node has been configured, false otherwise
Definition: output_node.h:191
OutputNode::_publishFromCache
bool _publishFromCache
Publish frequency policy used by this node.
Definition: output_node.h:187
OutputNode::sendSingleMsg
virtual void sendSingleMsg(const std::string &id, const DATA *data)=0
Sends out a single msg, to be implemented by derived classes.
OutputNode::_computePeriod
unsigned int _computePeriod
Property specifying the number of loops that passes between executions of this node.
Definition: output_node.h:198
ComputationalNode::Output
@ Output
Definition: computational_node.h:37
OutputNode::publishFromCache
bool publishFromCache()
Definition: output_node.h:109
ComputationalNode::id
const std::string & id() const
Returns the node 'id'.
Definition: computational_node.h:57
NRPLogger::info
static void info(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for info level.
Definition: nrp_logger.h:138
OutputNode::sendMsgs
void sendMsgs(const std::string &id)
Sends all msgs stored in _storedMsgs['id'] and clears the storage.
Definition: output_node.h:157
OutputNode
Implementation of an output node in the computation graph.
Definition: output_node.h:38
OutputNode::getOrRegisterInput
InputPort< T_IN, DATA > * getOrRegisterInput(const std::string &id)
Gets or register input port to this node and returns a pointer to it.
Definition: output_node.h:70
computational_node.h
OutputNode::OutputNode
OutputNode(const std::string &id, OutputNodePolicies::PublishFormatPolicy publishFormatPolicy=OutputNodePolicies::PublishFormatPolicy::SERIES, bool publishFromCache=false, int maxPortConnections=0, unsigned int computePeriod=1)
Constructor.
Definition: output_node.h:44
OutputNode::setComputePeriod
void setComputePeriod(unsigned int computePeriod)
Definition: output_node.h:98
OutputNode::storeMsg
void storeMsg(const std::string &id, const DATA *data)
Stores a new msg in _storedMsgs['id'].
Definition: output_node.h:142
computational_node_policies.h
OutputNode::getComputePeriod
unsigned int getComputePeriod()
Definition: output_node.h:95
OutputNode::ComputationalNodes_OUTPUT_NODE_Test
friend class ComputationalNodes_OUTPUT_NODE_Test
Definition: output_node.h:202
OutputNodePolicies::SERIES
@ SERIES
Definition: computational_node_policies.h:51
ComputationalNode::doCompute
virtual bool doCompute() const
Tells if this node should be executed in this graph execution cycle, used in some graph execution mod...
Definition: computational_node.h:96
NRPException::logCreate
static EXCEPTION logCreate(LOG_EXCEPTION_T &exception, const std::string &msg, NRPLogger::spdlog_out_fcn_t spdlogCall=NRPLogger::critical)
Definition: nrp_exceptions.h:73
OutputNode::ComputationalGraphPythonNodes_PYTHON_DECORATORS_BASIC_Test
friend class ComputationalGraphPythonNodes_PYTHON_DECORATORS_BASIC_Test
Definition: output_node.h:203
OutputNode::_inputPorts
std::map< std::string, std::shared_ptr< Port > > _inputPorts
List of ports owned by this node.
Definition: output_node.h:181
OutputNode::doCompute
virtual bool doCompute() const override final
Tells if this node should be executed in this graph execution cycle, used in some graph execution mod...
Definition: output_node.h:112
InputPort
Implementation of an input port in the computation graph.
Definition: input_port.h:42
python_json_engine.port
port
Definition: python_json_engine.py:197
OutputNode::sendBatchMsg
virtual void sendBatchMsg(const std::string &id, const std::vector< const DATA * > &data)=0
Sends out a vector of msg as a single batch, to be implemented by derived classes.
OutputNode::compute
virtual void compute() override final
Requests the node to execute its computation.
Definition: output_node.h:127
OutputNode::_nLoop
unsigned int _nLoop
Variable counting the number of times this nodes has been asked to execute (1 per loop)
Definition: output_node.h:200
OutputNode::configure
void configure() override
Configures the node making it ready to execute 'compute'.
Definition: output_node.h:117
OutputNode::_publishFormatPolicy
OutputNodePolicies::PublishFormatPolicy _publishFormatPolicy
Send policy used by this node.
Definition: output_node.h:185
NRPLogger::debug
static void debug(const FormatString &fmt, const Args &...args)
NRP logging function with message formatting for debug level.
Definition: nrp_logger.h:127
OutputNode::_storedMsgs
std::map< std::string, std::vector< const DATA * > > _storedMsgs
List of msgs stored in this node.
Definition: output_node.h:183
OutputNodePolicies::PublishFormatPolicy
PublishFormatPolicy
Defines how output nodes send stored msgs.
Definition: computational_node_policies.h:50
ComputationalNode
Base class implementing a node in the computational graph.
Definition: computational_node.h:31
input_port.h
OutputNode::graphCycleStartCB
void graphCycleStartCB() override final
Function called by the Computational Graph at the beginning of a new execution cycle.
Definition: output_node.h:136